From 00002525e485de770a6380d62c5aea24b04ac1d0 Mon Sep 17 00:00:00 2001 From: luaneko Date: Sun, 12 Jan 2025 01:09:34 +1100 Subject: [PATCH] Implement wire automatic reconnection --- README.md | 4 ++-- deno.json | 2 +- mod.ts | 12 ++++------ wire.ts | 68 +++++++++++++++++++++++++++++++++++++++---------------- 4 files changed, 56 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 06faa60..d9d7cfc 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL. ## Installation ```ts -import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.1.3/mod.ts"; +import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.2.0/mod.ts"; // ...or from github: -import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.1.3/mod.ts"; +import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.2.0/mod.ts"; ``` ## Documentation diff --git a/deno.json b/deno.json index 00f099a..c594288 100644 --- a/deno.json +++ b/deno.json @@ -1,5 +1,5 @@ { "name": "@luaneko/pglue", - "version": "0.1.3", + "version": "0.2.0", "exports": "./mod.ts" } diff --git a/mod.ts b/mod.ts index eaea793..c4daac7 100644 --- a/mod.ts +++ b/mod.ts @@ -8,7 +8,7 @@ import { union, unknown, } from "./valita.ts"; -import { Pool, wire_connect, type LogLevel } from "./wire.ts"; +import { Pool, wire_connect } from "./wire.ts"; import { sql_types, type SqlTypeMap } from "./query.ts"; export { @@ -61,6 +61,7 @@ const ParsedOptions = object({ runtime_params: record(string()).optional(() => ({})), max_connections: number().optional(() => 10), idle_timeout: number().optional(() => 20), + reconnect_delay: number().optional(() => 5), types: record(unknown()) .optional(() => ({})) .map((types): SqlTypeMap => ({ ...sql_types, ...types })), @@ -101,10 +102,6 @@ export function connect(s: string, options: Options = {}) { postgres.connect = connect; -export type PostgresEvents = { - log(level: LogLevel, ctx: object, msg: string): void; -}; - export class Postgres extends Pool { readonly #options; @@ -113,8 +110,9 @@ export class Postgres extends Pool { this.#options = options; } - async connect() { - const wire = await wire_connect(this.#options); + async connect(options: Options = {}) { + const opts = ParsedOptions.parse({ ...this.#options, ...options }); + const wire = await wire_connect(opts); return wire.on("log", (l, c, s) => this.emit("log", l, c, s)); } } diff --git a/wire.ts b/wire.ts index ec1e6b2..2e65083 100644 --- a/wire.ts +++ b/wire.ts @@ -448,14 +448,15 @@ export interface WireOptions { readonly password: string; readonly database: string | null; readonly runtime_params: Record; + readonly reconnect_delay: number; readonly types: SqlTypeMap; } export type WireEvents = { log(level: LogLevel, ctx: object, msg: string): void; notice(notice: PostgresError): void; - parameter(name: string, value: string, prev: string | null): void; notify(channel: string, payload: string, process_id: number): void; + parameter(name: string, value: string, prev: string | null): void; close(reason?: unknown): void; }; @@ -482,7 +483,10 @@ export async function wire_connect(options: WireOptions) { return await wire.connect(), wire; } -export class Wire extends TypedEmitter implements Disposable { +export class Wire + extends TypedEmitter + implements Disposable +{ readonly #params; readonly #connect; readonly #query; @@ -575,7 +579,16 @@ async function socket_connect(hostname: string, port: number) { function wire_impl( wire: Wire, - { host, port, user, database, password, runtime_params, types }: WireOptions + { + host, + port, + user, + database, + password, + runtime_params, + reconnect_delay, + types, + }: WireOptions ) { // current runtime parameters as reported by postgres const params: Parameters = Object.create(null); @@ -586,6 +599,7 @@ function wire_impl( // wire supports re-connection; socket and read/write channels are null when closed let connected = false; + let should_reconnect = false; let socket: Deno.Conn | null = null; let read_pop: Receiver | null = null; let write_push: Sender | null = null; @@ -625,7 +639,7 @@ function wire_impl( } catch (e) { throw (err = e); } finally { - if (connected) close(err); + onclose(err); } } @@ -640,6 +654,16 @@ function wire_impl( return true; } + case NotificationResponse.type: { + const { channel, payload, process_id } = ser_decode( + NotificationResponse, + msg + ); + wire.emit("notify", channel, payload, process_id); + channels.get(channel)?.emit("notify", payload, process_id); + return true; + } + case ParameterStatus.type: { const { name, value } = ser_decode(ParameterStatus, msg); const prev = params[name] ?? null; @@ -651,16 +675,6 @@ function wire_impl( wire.emit("parameter", name, value, prev); return true; } - - case NotificationResponse.type: { - const { channel, payload, process_id } = ser_decode( - NotificationResponse, - msg - ); - wire.emit("notify", channel, payload, process_id); - channels.get(channel)?.emit("notify", payload, process_id); - return true; - } } return false; @@ -688,7 +702,7 @@ function wire_impl( } catch (e) { throw (err = e); } finally { - if (connected) close(err); + onclose(err); } } @@ -701,13 +715,26 @@ function wire_impl( read_pop = channel.receiver((push) => read_socket(s, push)); write_push = channel.sender((pop) => write_socket(s, pop)); await handle_auth(); // run auth with rw lock - connected = true; + (connected = true), (should_reconnect = reconnect_delay !== 0); } catch (e) { throw (close(e), e); } } + function reconnect() { + connect().catch((err) => { + log("warn", err as Error, `reconnect failed`); + setTimeout(reconnect, reconnect_delay); + }); + } + function close(reason?: unknown) { + (should_reconnect = false), onclose(reason); + } + + function onclose(reason?: unknown) { + if (!connected) return; + else connected = false; socket?.close(), (socket = null); read_pop?.close(reason), (read_pop = null); write_push?.close(reason), (write_push = null); @@ -715,7 +742,8 @@ function wire_impl( delete (params as Record)[name]; st_cache.clear(), (st_ids = 0); (tx_status = "I"), (tx_stack.length = 0); - connected &&= (wire.emit("close", reason), false); + should_reconnect &&= (setTimeout(reconnect, reconnect_delay), false); + wire.emit("close", reason); } // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING @@ -1390,7 +1418,7 @@ export type PoolEvents = { log(level: LogLevel, ctx: object, msg: string): void; }; -export interface PoolWire extends Wire { +export interface PoolWire extends Wire { readonly connection_id: number; readonly borrowed: boolean; release(): void; @@ -1400,8 +1428,8 @@ export interface PoolTransaction extends Transaction { readonly wire: PoolWire; } -export class Pool - extends TypedEmitter +export class Pool + extends TypedEmitter implements PromiseLike, Disposable { readonly #acquire;