diff --git a/wire.ts b/wire.ts index eeb4fba..7cb2460 100644 --- a/wire.ts +++ b/wire.ts @@ -771,57 +771,100 @@ function wire_impl( types, }: WireOptions ) { - // current runtime parameters as reported by postgres const params: Parameters = Object.create(null); function log(level: LogLevel, ctx: object, msg: string) { wire.emit("log", level, ctx, msg); } - // wire supports re-connection; socket and read/write channels are null when closed let connected = false; - let should_reconnect = false; - let socket: Deno.Conn | null = null; - let read_pop: Receiver | null = null; - let write_push: Sender | null = null; + let close_requested = false; + let read_queue: Receiver | null = null; + let write_queue: Sender | null = null; + + async function connect() { + using _rlock = await rlock(); + using _wlock = await wlock(); + if (connected) return; + else close_requested = false; + let socket: Deno.Conn | undefined; + let closed = false; + + try { + const read = channel(); + const write = channel(); + socket = await socket_connect(host, port); + read_queue?.close(), (read_queue = read.recv); + write_queue?.close(), (write_queue = write.send); + read_socket(socket, read.send).then(onclose, onclose); + write_socket(socket, write.recv).then(onclose, onclose); + await handle_auth(); // run auth with rw lock + + if (close_requested) throw new WireError(`close requested`); + else (connected = true), wire.emit("connect"); + } catch (e) { + throw (onclose(e), e); + } + + function onclose(reason?: unknown) { + if (closed) return; + else closed = true; + socket?.close(); + for (const name of Object.keys(params)) + delete (params as Record)[name]; + st_cache.clear(), (st_ids = 0); + (tx_status = "I"), (tx_stack.length = 0); + connected &&= (wire.emit("close", reason), reconnect(), false); + } + } + + let reconnect_timer = -1; + function reconnect() { + if (close_requested || reconnect_delay === null) return; + connect().catch((err) => { + log("warn", err, `reconnect failed`); + clearTimeout(reconnect_timer); + reconnect_timer = setTimeout(reconnect, reconnect_delay); + }); + } + + function close(reason?: unknown) { + close_requested = true; + clearTimeout(reconnect_timer); + read_queue?.close(reason), (read_queue = null); + write_queue?.close(reason), (write_queue = null); + } async function read(type: Encoder) { - const msg = read_pop !== null ? await read_pop() : null; + const msg = read_queue !== null ? await read_queue() : null; if (msg !== null) return ser_decode(type, msg_check_err(msg)); else throw new WireError(`connection closed`); } async function read_msg() { - const msg = read_pop !== null ? await read_pop() : null; + const msg = read_queue !== null ? await read_queue() : null; if (msg !== null) return msg; else throw new WireError(`connection closed`); } - async function read_socket(socket: Deno.Conn, push: Sender) { - let err; - try { - const header_size = 5; - const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads - let buf = new Uint8Array(); // concatenated messages read so far + async function read_socket(socket: Deno.Conn, send: Sender) { + const header_size = 5; + const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads + let buf = new Uint8Array(); // concatenated messages read so far - for (let read; (read = await socket.read(read_buf)) !== null; ) { - buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf - while (buf.length >= header_size) { - const size = ser_decode(Header, buf).length + 1; - if (buf.length < size) break; - const msg = buf.subarray(0, size); // shift one message from buf - buf = buf.subarray(size); - if (!handle_msg(msg)) push(msg); - } + for (let read; (read = await socket.read(read_buf)) !== null; ) { + buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf + while (buf.length >= header_size) { + const size = ser_decode(Header, buf).length + 1; + if (buf.length < size) break; + const msg = buf.subarray(0, size); // shift one message from buf + buf = buf.subarray(size); + if (!handle_msg(msg)) send(msg); } - - // there should be nothing left in buf if we gracefully exited - if (buf.length !== 0) throw new WireError(`unexpected end of stream`); - } catch (e) { - throw (err = e); - } finally { - onclose(err); } + + // there should be nothing left in buf if we gracefully exited + if (buf.length !== 0) throw new WireError(`unexpected end of stream`); } function handle_msg(msg: Uint8Array) { @@ -866,69 +909,20 @@ function wire_impl( } function write_msg(buf: Uint8Array) { - if (write_push !== null) write_push(buf); + if (write_queue !== null) write_queue(buf); else throw new WireError(`connection closed`); } - async function write_socket(socket: Deno.Conn, pop: Receiver) { - let err; - try { - for (let buf; (buf = await pop()) !== null; ) { - const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any - for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf; - if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls - for (let i = 0, n = buf.length; i < n; ) - i += await socket.write(buf.subarray(i)); - } - } catch (e) { - throw (err = e); - } finally { - onclose(err); + async function write_socket(socket: Deno.Conn, recv: Receiver) { + for (let buf; (buf = await recv()) !== null; ) { + const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any + for (let i = 1, buf; (buf = recv.try()) !== null; ) bufs[i++] = buf; + if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls + for (let i = 0, n = buf.length; i < n; ) + i += await socket.write(buf.subarray(i)); } } - async function connect() { - using _rlock = await rlock(); - using _wlock = await wlock(); - if (connected) return; - try { - const s = (socket = await socket_connect(host, port)); - read_pop = channel.receiver((push) => read_socket(s, push)); - write_push = channel.sender((pop) => write_socket(s, pop)); - await handle_auth(); // run auth with rw lock - (connected = true), (should_reconnect = reconnect_delay !== null); - wire.emit("connect"); - } catch (e) { - throw (close(e), e); - } - } - - function reconnect() { - if (should_reconnect) return; - connect().catch((err) => { - log("warn", err as Error, `reconnect failed`); - if (reconnect_delay !== null) setTimeout(reconnect, reconnect_delay); - }); - } - - function close(reason?: unknown) { - (should_reconnect = false), onclose(reason); - } - - function onclose(reason?: unknown) { - if (!connected) return; - else connected = false; - socket?.close(), (socket = null); - read_pop?.close(reason), (read_pop = null); - write_push?.close(reason), (write_push = null); - for (const name of Object.keys(params)) - delete (params as Record)[name]; - st_cache.clear(), (st_ids = 0); - (tx_status = "I"), (tx_stack.length = 0); - should_reconnect &&= (reconnect(), false); - wire.emit("close", reason); - } - // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING const rlock = semaphore(); const wlock = semaphore(); @@ -1490,9 +1484,10 @@ function wire_impl( const tx_begin = query(sql`begin`); const tx_commit = query(sql`commit`); const tx_rollback = query(sql`rollback`); - const sp_savepoint = query(sql`savepoint __tx`); - const sp_release = query(sql`release __tx`); - const sp_rollback_to = query(sql`rollback to __tx`); + const sp_name = sql.ident`__pglue__tx`; + const sp_savepoint = query(sql`savepoint ${sp_name}`); + const sp_release = query(sql`release ${sp_name}`); + const sp_rollback_to = query(sql`rollback to ${sp_name}`); async function begin() { const tx = new Transaction(