Add verbose protocol logging

This commit is contained in:
luaneko 2025-01-13 11:00:33 +11:00
parent 90dc51a914
commit 3d65dcecf2
Signed by: luaneko
GPG Key ID: 406809B8763FF07A
5 changed files with 70 additions and 87 deletions

View File

@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL.
## Installation ## Installation
```ts ```ts
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.0/mod.ts"; import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.1/mod.ts";
// ...or from github: // ...or from github:
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.0/mod.ts"; import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.1/mod.ts";
``` ```
## Documentation ## Documentation

View File

@ -1,5 +1,5 @@
{ {
"name": "@luaneko/pglue", "name": "@luaneko/pglue",
"version": "0.3.0", "version": "0.3.1",
"exports": "./mod.ts" "exports": "./mod.ts"
} }

30
mod.ts
View File

@ -1,13 +1,6 @@
import pg_conn_str from "npm:pg-connection-string@^2.7.0"; import pg_conn_str from "npm:pg-connection-string@^2.7.0";
import type * as v from "./valita.ts"; import type * as v from "./valita.ts";
import { import { Pool, PoolOptions, Wire, WireOptions } from "./wire.ts";
Pool,
PoolOptions,
SubscribeOptions,
Subscription,
Wire,
WireOptions,
} from "./wire.ts";
export { export {
WireError, WireError,
@ -58,7 +51,6 @@ function parse_conn(s: string, options: Partial<WireOptions>) {
} }
postgres.connect = connect; postgres.connect = connect;
postgres.subscribe = subscribe;
export async function connect(s: string, options: Partial<WireOptions> = {}) { export async function connect(s: string, options: Partial<WireOptions> = {}) {
return await new Wire( return await new Wire(
@ -66,15 +58,6 @@ export async function connect(s: string, options: Partial<WireOptions> = {}) {
).connect(); ).connect();
} }
export async function subscribe(
s: string,
options: Partial<SubscribeOptions> = {}
) {
return await new Subscription(
SubscribeOptions.parse(parse_conn(s, options), { mode: "strip" })
).connect();
}
export type Options = v.Infer<typeof Options>; export type Options = v.Infer<typeof Options>;
export const Options = PoolOptions; export const Options = PoolOptions;
@ -93,15 +76,4 @@ export class Postgres extends Pool {
.on("log", (l, c, s) => this.emit("log", l, c, s)) .on("log", (l, c, s) => this.emit("log", l, c, s))
.connect(); .connect();
} }
async subscribe(options: Partial<SubscribeOptions> = {}) {
return await new Subscription(
SubscribeOptions.parse(
{ ...this.#options, ...options },
{ mode: "strip" }
)
)
.on("log", (l, c, s) => this.emit("log", l, c, s))
.connect();
}
} }

View File

@ -4,6 +4,7 @@ import { toText } from "jsr:@std/streams";
const pool = pglue(`postgres://test:test@localhost:5432/test`, { const pool = pglue(`postgres://test:test@localhost:5432/test`, {
runtime_params: { client_min_messages: "INFO" }, runtime_params: { client_min_messages: "INFO" },
verbose: true,
}); });
pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx)); pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx));

120
wire.ts
View File

@ -15,6 +15,7 @@ import {
type Sender, type Sender,
to_base58, to_base58,
to_base64, to_base64,
to_hex,
to_utf8, to_utf8,
TypedEmitter, TypedEmitter,
} from "./lstd.ts"; } from "./lstd.ts";
@ -472,6 +473,7 @@ export const WireOptions = v.object({
.record(v.unknown()) .record(v.unknown())
.optional(() => ({})) .optional(() => ({}))
.map((types): SqlTypeMap => ({ ...sql_types, ...types })), .map((types): SqlTypeMap => ({ ...sql_types, ...types })),
verbose: v.boolean().optional(() => false),
}); });
export type WireEvents = { export type WireEvents = {
@ -483,7 +485,7 @@ export type WireEvents = {
close(reason?: unknown): void; close(reason?: unknown): void;
}; };
export type LogLevel = "debug" | "info" | "warn" | "error" | "fatal"; export type LogLevel = "trace" | "debug" | "info" | "warn" | "error" | "fatal";
export interface Parameters extends Readonly<Partial<Record<string, string>>> {} export interface Parameters extends Readonly<Partial<Record<string, string>>> {}
@ -568,16 +570,6 @@ export class Wire<V extends WireEvents = WireEvents>
return this.#notify(channel, payload); return this.#notify(channel, payload);
} }
async subscribe(options: Partial<SubscribeOptions> = {}) {
const { lsn } = await this.current_wal();
return new Subscription(
SubscribeOptions.parse(
{ ...this.#options, lsn, ...options },
{ mode: "strip" }
)
).connect();
}
async current_setting(name: string) { async current_setting(name: string) {
return await this.query< return await this.query<
[string] [string]
@ -769,6 +761,7 @@ function wire_impl(
runtime_params, runtime_params,
reconnect_delay, reconnect_delay,
types, types,
verbose,
}: WireOptions }: WireOptions
) { ) {
const params: Parameters = Object.create(null); const params: Parameters = Object.create(null);
@ -841,7 +834,7 @@ function wire_impl(
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
async function read_msg() { async function read_any() {
const msg = read_queue !== null ? await read_queue() : null; const msg = read_queue !== null ? await read_queue() : null;
if (msg !== null) return msg; if (msg !== null) return msg;
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
@ -859,6 +852,8 @@ function wire_impl(
if (buf.length < size) break; if (buf.length < size) break;
const msg = buf.subarray(0, size); // shift one message from buf const msg = buf.subarray(0, size); // shift one message from buf
buf = buf.subarray(size); buf = buf.subarray(size);
if (verbose)
log("trace", {}, `RECV <- ${msg_type(msg)} ${to_hex(msg)}`);
if (!handle_msg(msg)) send(msg); if (!handle_msg(msg)) send(msg);
} }
} }
@ -899,25 +894,26 @@ function wire_impl(
wire.emit("parameter", name, value, prev); wire.emit("parameter", name, value, prev);
return true; return true;
} }
}
return false; default:
return false;
}
} }
function write<T>(type: Encoder<T>, value: T) { function write<T>(type: Encoder<T>, value: T) {
write_msg(ser_encode(type, value)); if (write_queue !== null) write_queue(ser_encode(type, value));
}
function write_msg(buf: Uint8Array) {
if (write_queue !== null) write_queue(buf);
else throw new WireError(`connection closed`); else throw new WireError(`connection closed`);
} }
async function write_socket(socket: Deno.Conn, recv: Receiver<Uint8Array>) { async function write_socket(socket: Deno.Conn, recv: Receiver<Uint8Array>) {
for (let buf; (buf = await recv()) !== null; ) { for (let buf; (buf = await recv()) !== null; ) {
const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any const msgs = [buf]; // proactively dequeue more queued msgs synchronously, if any
for (let i = 1, buf; (buf = recv.try()) !== null; ) bufs[i++] = buf; for (let i = 1, buf; (buf = recv.try()) !== null; ) msgs[i++] = buf;
if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls if (verbose) {
for (const msg of msgs)
log("trace", {}, `SEND -> ${msg_type(msg)} ${to_hex(msg)}`);
}
if (msgs.length !== 1) buf = buf_concat(msgs); // write queued msgs concatenated, reduce write syscalls
for (let i = 0, n = buf.length; i < n; ) for (let i = 0, n = buf.length; i < n; )
i += await socket.write(buf.subarray(i)); i += await socket.write(buf.subarray(i));
} }
@ -944,7 +940,7 @@ function wire_impl(
} finally { } finally {
try { try {
let msg; let msg;
while (msg_type((msg = await read_msg())) !== ReadyForQuery.type); while (msg_type((msg = await read_any())) !== ReadyForQuery.type);
({ tx_status } = ser_decode(ReadyForQuery, msg)); ({ tx_status } = ser_decode(ReadyForQuery, msg));
} catch { } catch {
// ignored // ignored
@ -983,7 +979,7 @@ function wire_impl(
}); });
auth: for (;;) { auth: for (;;) {
const msg = msg_check_err(await read_msg()); const msg = msg_check_err(await read_any());
switch (msg_type(msg)) { switch (msg_type(msg)) {
case NegotiateProtocolVersion.type: { case NegotiateProtocolVersion.type: {
const { bad_options } = ser_decode(NegotiateProtocolVersion, msg); const { bad_options } = ser_decode(NegotiateProtocolVersion, msg);
@ -1027,7 +1023,7 @@ function wire_impl(
// wait for ready // wait for ready
ready: for (;;) { ready: for (;;) {
const msg = msg_check_err(await read_msg()); const msg = msg_check_err(await read_any());
switch (msg_type(msg)) { switch (msg_type(msg)) {
case BackendKeyData.type: case BackendKeyData.type:
continue; // ignored continue; // ignored
@ -1163,7 +1159,7 @@ function wire_impl(
await read(ParseComplete); await read(ParseComplete);
const ser_params = make_param_ser(await read(ParameterDescription)); const ser_params = make_param_ser(await read(ParameterDescription));
const msg = msg_check_err(await read_msg()); const msg = msg_check_err(await read_any());
const Row = const Row =
msg_type(msg) === NoData.type msg_type(msg) === NoData.type
? EmptyRow ? EmptyRow
@ -1244,7 +1240,7 @@ function wire_impl(
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
) { ) {
for (let rows = [], i = 0; ; ) { for (let rows = [], i = 0; ; ) {
const msg = msg_check_err(await read_msg()); const msg = msg_check_err(await read_any());
switch (msg_type(msg)) { switch (msg_type(msg)) {
default: default:
case DataRow.type: case DataRow.type:
@ -1278,40 +1274,47 @@ function wire_impl(
} }
async function read_copy_out(stream: WritableStream<Uint8Array> | null) { async function read_copy_out(stream: WritableStream<Uint8Array> | null) {
if (stream !== null) { const writer = stream?.getWriter();
const writer = stream.getWriter(); try {
try { copy: for (;;) {
for (let msg; msg_type((msg = await read_msg())) !== CopyDone.type; ) { const msg = msg_check_err(await read_any());
const { data } = ser_decode(CopyData, msg_check_err(msg)); switch (msg_type(msg)) {
await writer.write(to_utf8(data)); default:
case CopyData.type: {
const { data } = ser_decode(CopyData, msg);
console.log(`COPY OUT`, to_hex(data));
await writer?.write(to_utf8(data));
continue;
}
case CopyDone.type:
case CommandComplete.type: // walsender sends 'C' to end of CopyBothResponse
await writer?.close();
break copy;
} }
await writer.close();
} catch (e) {
await writer.abort(e);
throw e;
} finally {
writer.releaseLock();
} }
} else { } catch (e) {
while (msg_type(msg_check_err(await read_msg())) !== CopyDone.type); await writer?.abort(e);
throw e;
} finally {
writer?.releaseLock();
} }
} }
async function write_copy_in(stream: ReadableStream<Uint8Array> | null) { async function write_copy_in(stream: ReadableStream<Uint8Array> | null) {
if (stream !== null) { const reader = stream?.getReader();
const reader = stream.getReader(); try {
try { if (reader) {
for (let next; !(next = await reader.read()).done; ) for (let next; !(next = await reader.read()).done; )
write(CopyData, { data: next.value }); write(CopyData, { data: next.value });
write(CopyDone, {});
} catch (e) {
write(CopyFail, { cause: String(e) });
throw e;
} finally {
reader.releaseLock();
} }
} else {
write(CopyDone, {}); write(CopyDone, {});
} catch (e) {
write(CopyFail, { cause: String(e) });
reader?.cancel(e);
throw e;
} finally {
reader?.releaseLock();
} }
} }
@ -1328,23 +1331,30 @@ function wire_impl(
}, },
async () => { async () => {
for (let chunks = [], err; ; ) { for (let chunks = [], err; ; ) {
const msg = await read_msg(); const msg = await read_any();
switch (msg_type(msg)) { switch (msg_type(msg)) {
default: default:
case ReadyForQuery.type: case ReadyForQuery.type:
ser_decode(ReadyForQuery, msg);
if (err) throw err; if (err) throw err;
else return chunks; else return chunks;
case RowDescription.type: { case RowDescription.type: {
const Row = make_row_ctor(ser_decode(RowDescription, msg)); const Row = make_row_ctor(ser_decode(RowDescription, msg));
const { rows } = await read_rows(Row, stdout); const { rows } = await read_rows(Row, stdout);
chunks.push(rows); chunks.push(rows), (stdout = null);
stdout = null;
continue; continue;
} }
case EmptyQueryResponse.type: case EmptyQueryResponse.type:
case CommandComplete.type: case CommandComplete.type:
case CopyInResponse.type:
case CopyDone.type:
continue;
case CopyOutResponse.type:
case CopyBothResponse.type:
await read_copy_out(stdout), (stdout = null);
continue; continue;
case ErrorResponse.type: { case ErrorResponse.type: {
@ -1484,7 +1494,7 @@ function wire_impl(
const tx_begin = query(sql`begin`); const tx_begin = query(sql`begin`);
const tx_commit = query(sql`commit`); const tx_commit = query(sql`commit`);
const tx_rollback = query(sql`rollback`); const tx_rollback = query(sql`rollback`);
const sp_name = sql.ident`__pglue__tx`; const sp_name = sql.ident`__pglue_tx`;
const sp_savepoint = query(sql`savepoint ${sp_name}`); const sp_savepoint = query(sql`savepoint ${sp_name}`);
const sp_release = query(sql`release ${sp_name}`); const sp_release = query(sql`release ${sp_name}`);
const sp_rollback_to = query(sql`rollback to ${sp_name}`); const sp_rollback_to = query(sql`rollback to ${sp_name}`);