Rewrite wire reconnection logic

This commit is contained in:
luaneko 2025-01-13 01:08:18 +11:00
parent a1b66c4c48
commit 90dc51a914
Signed by: luaneko
GPG Key ID: 406809B8763FF07A

175
wire.ts
View File

@ -771,57 +771,100 @@ function wire_impl(
types, types,
}: WireOptions }: WireOptions
) { ) {
// current runtime parameters as reported by postgres
const params: Parameters = Object.create(null); const params: Parameters = Object.create(null);
function log(level: LogLevel, ctx: object, msg: string) { function log(level: LogLevel, ctx: object, msg: string) {
wire.emit("log", level, ctx, msg); wire.emit("log", level, ctx, msg);
} }
// wire supports re-connection; socket and read/write channels are null when closed
let connected = false; let connected = false;
let should_reconnect = false; let close_requested = false;
let socket: Deno.Conn | null = null; let read_queue: Receiver<Uint8Array> | null = null;
let read_pop: Receiver<Uint8Array> | null = null; let write_queue: Sender<Uint8Array> | null = null;
let write_push: Sender<Uint8Array> | null = null;
async function connect() {
using _rlock = await rlock();
using _wlock = await wlock();
if (connected) return;
else close_requested = false;
let socket: Deno.Conn | undefined;
let closed = false;
try {
const read = channel<Uint8Array>();
const write = channel<Uint8Array>();
socket = await socket_connect(host, port);
read_queue?.close(), (read_queue = read.recv);
write_queue?.close(), (write_queue = write.send);
read_socket(socket, read.send).then(onclose, onclose);
write_socket(socket, write.recv).then(onclose, onclose);
await handle_auth(); // run auth with rw lock
if (close_requested) throw new WireError(`close requested`);
else (connected = true), wire.emit("connect");
} catch (e) {
throw (onclose(e), e);
}
function onclose(reason?: unknown) {
if (closed) return;
else closed = true;
socket?.close();
for (const name of Object.keys(params))
delete (params as Record<string, string>)[name];
st_cache.clear(), (st_ids = 0);
(tx_status = "I"), (tx_stack.length = 0);
connected &&= (wire.emit("close", reason), reconnect(), false);
}
}
let reconnect_timer = -1;
function reconnect() {
if (close_requested || reconnect_delay === null) return;
connect().catch((err) => {
log("warn", err, `reconnect failed`);
clearTimeout(reconnect_timer);
reconnect_timer = setTimeout(reconnect, reconnect_delay);
});
}
function close(reason?: unknown) {
close_requested = true;
clearTimeout(reconnect_timer);
read_queue?.close(reason), (read_queue = null);
write_queue?.close(reason), (write_queue = null);
}
async function read<T>(type: Encoder<T>) { async function read<T>(type: Encoder<T>) {
const msg = read_pop !== null ? await read_pop() : null; const msg = read_queue !== null ? await read_queue() : null;
if (msg !== null) return ser_decode(type, msg_check_err(msg)); if (msg !== null) return ser_decode(type, msg_check_err(msg));
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
async function read_msg() { async function read_msg() {
const msg = read_pop !== null ? await read_pop() : null; const msg = read_queue !== null ? await read_queue() : null;
if (msg !== null) return msg; if (msg !== null) return msg;
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
async function read_socket(socket: Deno.Conn, push: Sender<Uint8Array>) { async function read_socket(socket: Deno.Conn, send: Sender<Uint8Array>) {
let err; const header_size = 5;
try { const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads
const header_size = 5; let buf = new Uint8Array(); // concatenated messages read so far
const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads
let buf = new Uint8Array(); // concatenated messages read so far
for (let read; (read = await socket.read(read_buf)) !== null; ) { for (let read; (read = await socket.read(read_buf)) !== null; ) {
buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf
while (buf.length >= header_size) { while (buf.length >= header_size) {
const size = ser_decode(Header, buf).length + 1; const size = ser_decode(Header, buf).length + 1;
if (buf.length < size) break; if (buf.length < size) break;
const msg = buf.subarray(0, size); // shift one message from buf const msg = buf.subarray(0, size); // shift one message from buf
buf = buf.subarray(size); buf = buf.subarray(size);
if (!handle_msg(msg)) push(msg); if (!handle_msg(msg)) send(msg);
}
} }
// there should be nothing left in buf if we gracefully exited
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
} catch (e) {
throw (err = e);
} finally {
onclose(err);
} }
// there should be nothing left in buf if we gracefully exited
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
} }
function handle_msg(msg: Uint8Array) { function handle_msg(msg: Uint8Array) {
@ -866,69 +909,20 @@ function wire_impl(
} }
function write_msg(buf: Uint8Array) { function write_msg(buf: Uint8Array) {
if (write_push !== null) write_push(buf); if (write_queue !== null) write_queue(buf);
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
async function write_socket(socket: Deno.Conn, pop: Receiver<Uint8Array>) { async function write_socket(socket: Deno.Conn, recv: Receiver<Uint8Array>) {
let err; for (let buf; (buf = await recv()) !== null; ) {
try { const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any
for (let buf; (buf = await pop()) !== null; ) { for (let i = 1, buf; (buf = recv.try()) !== null; ) bufs[i++] = buf;
const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls
for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf; for (let i = 0, n = buf.length; i < n; )
if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls i += await socket.write(buf.subarray(i));
for (let i = 0, n = buf.length; i < n; )
i += await socket.write(buf.subarray(i));
}
} catch (e) {
throw (err = e);
} finally {
onclose(err);
} }
} }
async function connect() {
using _rlock = await rlock();
using _wlock = await wlock();
if (connected) return;
try {
const s = (socket = await socket_connect(host, port));
read_pop = channel.receiver((push) => read_socket(s, push));
write_push = channel.sender((pop) => write_socket(s, pop));
await handle_auth(); // run auth with rw lock
(connected = true), (should_reconnect = reconnect_delay !== null);
wire.emit("connect");
} catch (e) {
throw (close(e), e);
}
}
function reconnect() {
if (should_reconnect) return;
connect().catch((err) => {
log("warn", err as Error, `reconnect failed`);
if (reconnect_delay !== null) setTimeout(reconnect, reconnect_delay);
});
}
function close(reason?: unknown) {
(should_reconnect = false), onclose(reason);
}
function onclose(reason?: unknown) {
if (!connected) return;
else connected = false;
socket?.close(), (socket = null);
read_pop?.close(reason), (read_pop = null);
write_push?.close(reason), (write_push = null);
for (const name of Object.keys(params))
delete (params as Record<string, string>)[name];
st_cache.clear(), (st_ids = 0);
(tx_status = "I"), (tx_stack.length = 0);
should_reconnect &&= (reconnect(), false);
wire.emit("close", reason);
}
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING
const rlock = semaphore(); const rlock = semaphore();
const wlock = semaphore(); const wlock = semaphore();
@ -1490,9 +1484,10 @@ function wire_impl(
const tx_begin = query(sql`begin`); const tx_begin = query(sql`begin`);
const tx_commit = query(sql`commit`); const tx_commit = query(sql`commit`);
const tx_rollback = query(sql`rollback`); const tx_rollback = query(sql`rollback`);
const sp_savepoint = query(sql`savepoint __tx`); const sp_name = sql.ident`__pglue__tx`;
const sp_release = query(sql`release __tx`); const sp_savepoint = query(sql`savepoint ${sp_name}`);
const sp_rollback_to = query(sql`rollback to __tx`); const sp_release = query(sql`release ${sp_name}`);
const sp_rollback_to = query(sql`rollback to ${sp_name}`);
async function begin() { async function begin() {
const tx = new Transaction( const tx = new Transaction(