Fix close event not firing on wire

This commit is contained in:
luaneko 2025-01-12 00:23:26 +11:00
parent 29b2796627
commit 29b79f25c0
Signed by: luaneko
GPG Key ID: 406809B8763FF07A

61
wire.ts
View File

@ -585,6 +585,7 @@ function wire_impl(
} }
// wire supports re-connection; socket and read/write channels are null when closed // wire supports re-connection; socket and read/write channels are null when closed
let connected = false;
let socket: Deno.Conn | null = null; let socket: Deno.Conn | null = null;
let read_pop: Receiver<Uint8Array> | null = null; let read_pop: Receiver<Uint8Array> | null = null;
let write_push: Sender<Uint8Array> | null = null; let write_push: Sender<Uint8Array> | null = null;
@ -601,25 +602,31 @@ function wire_impl(
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
// socket reader channel worker
async function read_socket(socket: Deno.Conn, push: Sender<Uint8Array>) { async function read_socket(socket: Deno.Conn, push: Sender<Uint8Array>) {
const header_size = 5; let err;
const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads try {
let buf = new Uint8Array(); // concatenated messages read so far const header_size = 5;
const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads
let buf = new Uint8Array(); // concatenated messages read so far
for (let read; (read = await socket.read(read_buf)) !== null; ) { for (let read; (read = await socket.read(read_buf)) !== null; ) {
buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf
while (buf.length >= header_size) { while (buf.length >= header_size) {
const size = ser_decode(Header, buf).length + 1; const size = ser_decode(Header, buf).length + 1;
if (buf.length < size) break; if (buf.length < size) break;
const msg = buf.subarray(0, size); // shift one message from buf const msg = buf.subarray(0, size); // shift one message from buf
buf = buf.subarray(size); buf = buf.subarray(size);
if (!handle_msg(msg)) push(msg); if (!handle_msg(msg)) push(msg);
}
} }
}
// there should be nothing left in buf if we gracefully exited // there should be nothing left in buf if we gracefully exited
if (buf.length !== 0) throw new WireError(`unexpected end of stream`); if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
} catch (e) {
throw (err = e);
} finally {
if (connected) close(err);
}
} }
function handle_msg(msg: Uint8Array) { function handle_msg(msg: Uint8Array) {
@ -668,26 +675,33 @@ function wire_impl(
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
// socket writer channel worker
async function write_socket(socket: Deno.Conn, pop: Receiver<Uint8Array>) { async function write_socket(socket: Deno.Conn, pop: Receiver<Uint8Array>) {
for (let buf; (buf = await pop()) !== null; ) { let err;
const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any try {
for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf; for (let buf; (buf = await pop()) !== null; ) {
if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any
for (let i = 0, n = buf.length; i < n; ) for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf;
i += await socket.write(buf.subarray(i)); if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls
for (let i = 0, n = buf.length; i < n; )
i += await socket.write(buf.subarray(i));
}
} catch (e) {
throw (err = e);
} finally {
if (connected) close(err);
} }
} }
async function connect() { async function connect() {
using _rlock = await rlock(); using _rlock = await rlock();
using _wlock = await wlock(); using _wlock = await wlock();
close(new WireError(`reconnecting`)); if (connected) return;
try { try {
const s = (socket = await socket_connect(host, port)); const s = (socket = await socket_connect(host, port));
read_pop = channel.receiver((push) => read_socket(s, push)); read_pop = channel.receiver((push) => read_socket(s, push));
write_push = channel.sender((pop) => write_socket(s, pop)); write_push = channel.sender((pop) => write_socket(s, pop));
await handle_auth(); // run auth with rw lock await handle_auth(); // run auth with rw lock
connected = true;
} catch (e) { } catch (e) {
throw (close(e), e); throw (close(e), e);
} }
@ -701,6 +715,7 @@ function wire_impl(
delete (params as Record<string, string>)[name]; delete (params as Record<string, string>)[name];
st_cache.clear(), (st_ids = 0); st_cache.clear(), (st_ids = 0);
(tx_status = "I"), (tx_stack.length = 0); (tx_status = "I"), (tx_stack.length = 0);
connected &&= (wire.emit("close", reason), false);
} }
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING