diff --git a/wire.ts b/wire.ts index eaaefa9..ec1e6b2 100644 --- a/wire.ts +++ b/wire.ts @@ -585,6 +585,7 @@ function wire_impl( } // wire supports re-connection; socket and read/write channels are null when closed + let connected = false; let socket: Deno.Conn | null = null; let read_pop: Receiver | null = null; let write_push: Sender | null = null; @@ -601,25 +602,31 @@ function wire_impl( else throw new WireError(`connection closed`); } - // socket reader channel worker async function read_socket(socket: Deno.Conn, push: Sender) { - const header_size = 5; - const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads - let buf = new Uint8Array(); // concatenated messages read so far + let err; + try { + const header_size = 5; + const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads + let buf = new Uint8Array(); // concatenated messages read so far - for (let read; (read = await socket.read(read_buf)) !== null; ) { - buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf - while (buf.length >= header_size) { - const size = ser_decode(Header, buf).length + 1; - if (buf.length < size) break; - const msg = buf.subarray(0, size); // shift one message from buf - buf = buf.subarray(size); - if (!handle_msg(msg)) push(msg); + for (let read; (read = await socket.read(read_buf)) !== null; ) { + buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf + while (buf.length >= header_size) { + const size = ser_decode(Header, buf).length + 1; + if (buf.length < size) break; + const msg = buf.subarray(0, size); // shift one message from buf + buf = buf.subarray(size); + if (!handle_msg(msg)) push(msg); + } } - } - // there should be nothing left in buf if we gracefully exited - if (buf.length !== 0) throw new WireError(`unexpected end of stream`); + // there should be nothing left in buf if we gracefully exited + if (buf.length !== 0) throw new WireError(`unexpected end of stream`); + } catch (e) { + throw (err = e); + } finally { + if (connected) close(err); + } } function handle_msg(msg: Uint8Array) { @@ -668,26 +675,33 @@ function wire_impl( else throw new WireError(`connection closed`); } - // socket writer channel worker async function write_socket(socket: Deno.Conn, pop: Receiver) { - for (let buf; (buf = await pop()) !== null; ) { - const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any - for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf; - if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls - for (let i = 0, n = buf.length; i < n; ) - i += await socket.write(buf.subarray(i)); + let err; + try { + for (let buf; (buf = await pop()) !== null; ) { + const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any + for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf; + if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls + for (let i = 0, n = buf.length; i < n; ) + i += await socket.write(buf.subarray(i)); + } + } catch (e) { + throw (err = e); + } finally { + if (connected) close(err); } } async function connect() { using _rlock = await rlock(); using _wlock = await wlock(); - close(new WireError(`reconnecting`)); + if (connected) return; try { const s = (socket = await socket_connect(host, port)); read_pop = channel.receiver((push) => read_socket(s, push)); write_push = channel.sender((pop) => write_socket(s, pop)); await handle_auth(); // run auth with rw lock + connected = true; } catch (e) { throw (close(e), e); } @@ -701,6 +715,7 @@ function wire_impl( delete (params as Record)[name]; st_cache.clear(), (st_ids = 0); (tx_status = "I"), (tx_stack.length = 0); + connected &&= (wire.emit("close", reason), false); } // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING