From 3d65dcecf2c80efb5db7883035fdbc70f89c9996 Mon Sep 17 00:00:00 2001 From: luaneko Date: Mon, 13 Jan 2025 11:00:33 +1100 Subject: [PATCH] Add verbose protocol logging --- README.md | 4 +- deno.json | 2 +- mod.ts | 30 +------------- test.ts | 1 + wire.ts | 120 +++++++++++++++++++++++++++++------------------------- 5 files changed, 70 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index 15b2d67..cb6f505 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL. ## Installation ```ts -import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.0/mod.ts"; +import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.1/mod.ts"; // ...or from github: -import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.0/mod.ts"; +import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.1/mod.ts"; ``` ## Documentation diff --git a/deno.json b/deno.json index a3a7793..32af812 100644 --- a/deno.json +++ b/deno.json @@ -1,5 +1,5 @@ { "name": "@luaneko/pglue", - "version": "0.3.0", + "version": "0.3.1", "exports": "./mod.ts" } diff --git a/mod.ts b/mod.ts index 56ec622..b79a02f 100644 --- a/mod.ts +++ b/mod.ts @@ -1,13 +1,6 @@ import pg_conn_str from "npm:pg-connection-string@^2.7.0"; import type * as v from "./valita.ts"; -import { - Pool, - PoolOptions, - SubscribeOptions, - Subscription, - Wire, - WireOptions, -} from "./wire.ts"; +import { Pool, PoolOptions, Wire, WireOptions } from "./wire.ts"; export { WireError, @@ -58,7 +51,6 @@ function parse_conn(s: string, options: Partial) { } postgres.connect = connect; -postgres.subscribe = subscribe; export async function connect(s: string, options: Partial = {}) { return await new Wire( @@ -66,15 +58,6 @@ export async function connect(s: string, options: Partial = {}) { ).connect(); } -export async function subscribe( - s: string, - options: Partial = {} -) { - return await new Subscription( - SubscribeOptions.parse(parse_conn(s, options), { mode: "strip" }) - ).connect(); -} - export type Options = v.Infer; export const Options = PoolOptions; @@ -93,15 +76,4 @@ export class Postgres extends Pool { .on("log", (l, c, s) => this.emit("log", l, c, s)) .connect(); } - - async subscribe(options: Partial = {}) { - return await new Subscription( - SubscribeOptions.parse( - { ...this.#options, ...options }, - { mode: "strip" } - ) - ) - .on("log", (l, c, s) => this.emit("log", l, c, s)) - .connect(); - } } diff --git a/test.ts b/test.ts index 7654a13..ff8be6a 100644 --- a/test.ts +++ b/test.ts @@ -4,6 +4,7 @@ import { toText } from "jsr:@std/streams"; const pool = pglue(`postgres://test:test@localhost:5432/test`, { runtime_params: { client_min_messages: "INFO" }, + verbose: true, }); pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx)); diff --git a/wire.ts b/wire.ts index 7cb2460..ca4746e 100644 --- a/wire.ts +++ b/wire.ts @@ -15,6 +15,7 @@ import { type Sender, to_base58, to_base64, + to_hex, to_utf8, TypedEmitter, } from "./lstd.ts"; @@ -472,6 +473,7 @@ export const WireOptions = v.object({ .record(v.unknown()) .optional(() => ({})) .map((types): SqlTypeMap => ({ ...sql_types, ...types })), + verbose: v.boolean().optional(() => false), }); export type WireEvents = { @@ -483,7 +485,7 @@ export type WireEvents = { close(reason?: unknown): void; }; -export type LogLevel = "debug" | "info" | "warn" | "error" | "fatal"; +export type LogLevel = "trace" | "debug" | "info" | "warn" | "error" | "fatal"; export interface Parameters extends Readonly>> {} @@ -568,16 +570,6 @@ export class Wire return this.#notify(channel, payload); } - async subscribe(options: Partial = {}) { - const { lsn } = await this.current_wal(); - return new Subscription( - SubscribeOptions.parse( - { ...this.#options, lsn, ...options }, - { mode: "strip" } - ) - ).connect(); - } - async current_setting(name: string) { return await this.query< [string] @@ -769,6 +761,7 @@ function wire_impl( runtime_params, reconnect_delay, types, + verbose, }: WireOptions ) { const params: Parameters = Object.create(null); @@ -841,7 +834,7 @@ function wire_impl( else throw new WireError(`connection closed`); } - async function read_msg() { + async function read_any() { const msg = read_queue !== null ? await read_queue() : null; if (msg !== null) return msg; else throw new WireError(`connection closed`); @@ -859,6 +852,8 @@ function wire_impl( if (buf.length < size) break; const msg = buf.subarray(0, size); // shift one message from buf buf = buf.subarray(size); + if (verbose) + log("trace", {}, `RECV <- ${msg_type(msg)} ${to_hex(msg)}`); if (!handle_msg(msg)) send(msg); } } @@ -899,25 +894,26 @@ function wire_impl( wire.emit("parameter", name, value, prev); return true; } - } - return false; + default: + return false; + } } function write(type: Encoder, value: T) { - write_msg(ser_encode(type, value)); - } - - function write_msg(buf: Uint8Array) { - if (write_queue !== null) write_queue(buf); + if (write_queue !== null) write_queue(ser_encode(type, value)); else throw new WireError(`connection closed`); } async function write_socket(socket: Deno.Conn, recv: Receiver) { for (let buf; (buf = await recv()) !== null; ) { - const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any - for (let i = 1, buf; (buf = recv.try()) !== null; ) bufs[i++] = buf; - if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls + const msgs = [buf]; // proactively dequeue more queued msgs synchronously, if any + for (let i = 1, buf; (buf = recv.try()) !== null; ) msgs[i++] = buf; + if (verbose) { + for (const msg of msgs) + log("trace", {}, `SEND -> ${msg_type(msg)} ${to_hex(msg)}`); + } + if (msgs.length !== 1) buf = buf_concat(msgs); // write queued msgs concatenated, reduce write syscalls for (let i = 0, n = buf.length; i < n; ) i += await socket.write(buf.subarray(i)); } @@ -944,7 +940,7 @@ function wire_impl( } finally { try { let msg; - while (msg_type((msg = await read_msg())) !== ReadyForQuery.type); + while (msg_type((msg = await read_any())) !== ReadyForQuery.type); ({ tx_status } = ser_decode(ReadyForQuery, msg)); } catch { // ignored @@ -983,7 +979,7 @@ function wire_impl( }); auth: for (;;) { - const msg = msg_check_err(await read_msg()); + const msg = msg_check_err(await read_any()); switch (msg_type(msg)) { case NegotiateProtocolVersion.type: { const { bad_options } = ser_decode(NegotiateProtocolVersion, msg); @@ -1027,7 +1023,7 @@ function wire_impl( // wait for ready ready: for (;;) { - const msg = msg_check_err(await read_msg()); + const msg = msg_check_err(await read_any()); switch (msg_type(msg)) { case BackendKeyData.type: continue; // ignored @@ -1163,7 +1159,7 @@ function wire_impl( await read(ParseComplete); const ser_params = make_param_ser(await read(ParameterDescription)); - const msg = msg_check_err(await read_msg()); + const msg = msg_check_err(await read_any()); const Row = msg_type(msg) === NoData.type ? EmptyRow @@ -1244,7 +1240,7 @@ function wire_impl( stdout: WritableStream | null ) { for (let rows = [], i = 0; ; ) { - const msg = msg_check_err(await read_msg()); + const msg = msg_check_err(await read_any()); switch (msg_type(msg)) { default: case DataRow.type: @@ -1278,40 +1274,47 @@ function wire_impl( } async function read_copy_out(stream: WritableStream | null) { - if (stream !== null) { - const writer = stream.getWriter(); - try { - for (let msg; msg_type((msg = await read_msg())) !== CopyDone.type; ) { - const { data } = ser_decode(CopyData, msg_check_err(msg)); - await writer.write(to_utf8(data)); + const writer = stream?.getWriter(); + try { + copy: for (;;) { + const msg = msg_check_err(await read_any()); + switch (msg_type(msg)) { + default: + case CopyData.type: { + const { data } = ser_decode(CopyData, msg); + console.log(`COPY OUT`, to_hex(data)); + await writer?.write(to_utf8(data)); + continue; + } + + case CopyDone.type: + case CommandComplete.type: // walsender sends 'C' to end of CopyBothResponse + await writer?.close(); + break copy; } - await writer.close(); - } catch (e) { - await writer.abort(e); - throw e; - } finally { - writer.releaseLock(); } - } else { - while (msg_type(msg_check_err(await read_msg())) !== CopyDone.type); + } catch (e) { + await writer?.abort(e); + throw e; + } finally { + writer?.releaseLock(); } } async function write_copy_in(stream: ReadableStream | null) { - if (stream !== null) { - const reader = stream.getReader(); - try { + const reader = stream?.getReader(); + try { + if (reader) { for (let next; !(next = await reader.read()).done; ) write(CopyData, { data: next.value }); - write(CopyDone, {}); - } catch (e) { - write(CopyFail, { cause: String(e) }); - throw e; - } finally { - reader.releaseLock(); } - } else { write(CopyDone, {}); + } catch (e) { + write(CopyFail, { cause: String(e) }); + reader?.cancel(e); + throw e; + } finally { + reader?.releaseLock(); } } @@ -1328,23 +1331,30 @@ function wire_impl( }, async () => { for (let chunks = [], err; ; ) { - const msg = await read_msg(); + const msg = await read_any(); switch (msg_type(msg)) { default: case ReadyForQuery.type: + ser_decode(ReadyForQuery, msg); if (err) throw err; else return chunks; case RowDescription.type: { const Row = make_row_ctor(ser_decode(RowDescription, msg)); const { rows } = await read_rows(Row, stdout); - chunks.push(rows); - stdout = null; + chunks.push(rows), (stdout = null); continue; } case EmptyQueryResponse.type: case CommandComplete.type: + case CopyInResponse.type: + case CopyDone.type: + continue; + + case CopyOutResponse.type: + case CopyBothResponse.type: + await read_copy_out(stdout), (stdout = null); continue; case ErrorResponse.type: { @@ -1484,7 +1494,7 @@ function wire_impl( const tx_begin = query(sql`begin`); const tx_commit = query(sql`commit`); const tx_rollback = query(sql`rollback`); - const sp_name = sql.ident`__pglue__tx`; + const sp_name = sql.ident`__pglue_tx`; const sp_savepoint = query(sql`savepoint ${sp_name}`); const sp_release = query(sql`release ${sp_name}`); const sp_rollback_to = query(sql`rollback to ${sp_name}`);