2 Commits

Author SHA1 Message Date
a4c0055c79 Return row value direct in .first() and .first_or(x) 2025-01-12 01:25:55 +11:00
00002525e4 Implement wire automatic reconnection 2025-01-12 01:09:34 +11:00
6 changed files with 99 additions and 86 deletions

View File

@@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL.
## Installation ## Installation
```ts ```ts
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.1.3/mod.ts"; import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.0/mod.ts";
// ...or from github: // ...or from github:
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.1.3/mod.ts"; import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.0/mod.ts";
``` ```
## Documentation ## Documentation

View File

@@ -1,5 +1,5 @@
{ {
"name": "@luaneko/pglue", "name": "@luaneko/pglue",
"version": "0.1.3", "version": "0.3.0",
"exports": "./mod.ts" "exports": "./mod.ts"
} }

19
mod.ts
View File

@@ -8,7 +8,7 @@ import {
union, union,
unknown, unknown,
} from "./valita.ts"; } from "./valita.ts";
import { Pool, wire_connect, type LogLevel } from "./wire.ts"; import { Pool, wire_connect } from "./wire.ts";
import { sql_types, type SqlTypeMap } from "./query.ts"; import { sql_types, type SqlTypeMap } from "./query.ts";
export { export {
@@ -27,11 +27,10 @@ export {
sql, sql,
is_sql, is_sql,
Query, Query,
type Row,
type CommandResult,
type Result, type Result,
type Results, type Row,
type ResultStream, type Rows,
type RowStream,
} from "./query.ts"; } from "./query.ts";
export type Options = { export type Options = {
@@ -61,6 +60,7 @@ const ParsedOptions = object({
runtime_params: record(string()).optional(() => ({})), runtime_params: record(string()).optional(() => ({})),
max_connections: number().optional(() => 10), max_connections: number().optional(() => 10),
idle_timeout: number().optional(() => 20), idle_timeout: number().optional(() => 20),
reconnect_delay: number().optional(() => 5),
types: record(unknown()) types: record(unknown())
.optional(() => ({})) .optional(() => ({}))
.map((types): SqlTypeMap => ({ ...sql_types, ...types })), .map((types): SqlTypeMap => ({ ...sql_types, ...types })),
@@ -101,10 +101,6 @@ export function connect(s: string, options: Options = {}) {
postgres.connect = connect; postgres.connect = connect;
export type PostgresEvents = {
log(level: LogLevel, ctx: object, msg: string): void;
};
export class Postgres extends Pool { export class Postgres extends Pool {
readonly #options; readonly #options;
@@ -113,8 +109,9 @@ export class Postgres extends Pool {
this.#options = options; this.#options = options;
} }
async connect() { async connect(options: Options = {}) {
const wire = await wire_connect(this.#options); const opts = ParsedOptions.parse({ ...this.#options, ...options });
const wire = await wire_connect(opts);
return wire.on("log", (l, c, s) => this.emit("log", l, c, s)); return wire.on("log", (l, c, s) => this.emit("log", l, c, s));
} }
} }

View File

@@ -323,22 +323,15 @@ export const sql_types: SqlTypeMap = {
sql.types = sql_types; sql.types = sql_types;
type ReadonlyTuple<T extends readonly unknown[]> = readonly [...T]; export interface Result {
export interface CommandResult {
readonly tag: string; readonly tag: string;
} }
export interface Result<T> extends CommandResult, ReadonlyTuple<[T]> { export interface Rows<T> extends Result, ReadonlyArray<T> {
readonly row: T;
}
export interface Results<T> extends CommandResult, ReadonlyArray<T> {
readonly rows: ReadonlyArray<T>; readonly rows: ReadonlyArray<T>;
} }
export interface ResultStream<T> export interface RowStream<T> extends AsyncIterable<T[], Result, void> {}
extends AsyncIterable<T[], CommandResult, void> {}
export interface Row extends Iterable<unknown, void, void> { export interface Row extends Iterable<unknown, void, void> {
[column: string]: unknown; [column: string]: unknown;
@@ -351,12 +344,10 @@ export interface QueryOptions {
readonly stdout: WritableStream<Uint8Array> | null; readonly stdout: WritableStream<Uint8Array> | null;
} }
export class Query<T = Row> export class Query<T = Row> implements PromiseLike<Rows<T>>, RowStream<T> {
implements PromiseLike<Results<T>>, ResultStream<T>
{
readonly #f; readonly #f;
constructor(f: (options: Partial<QueryOptions>) => ResultStream<T>) { constructor(f: (options: Partial<QueryOptions>) => RowStream<T>) {
this.#f = f; this.#f = f;
} }
@@ -431,20 +422,18 @@ export class Query<T = Row>
return this.#f(options); return this.#f(options);
} }
async first(): Promise<Result<T>> { async first(): Promise<T> {
const { rows, tag } = await this.collect(1); const rows = await this.collect(1);
if (!rows.length) throw new TypeError(`expected one row, got none instead`); if (rows.length !== 0) return rows[0];
const row = rows[0]; else throw new TypeError(`expected one row, got none instead`);
return Object.assign([row] as const, { row: rows[0], tag });
} }
async first_or<S>(value: S): Promise<Result<T | S>> { async first_or<S>(value: S): Promise<T | S> {
const { rows, tag } = await this.collect(1); const rows = await this.collect(1);
const row = rows.length ? rows[0] : value; return rows.length !== 0 ? rows[0] : value;
return Object.assign([row] as const, { row: rows[0], tag });
} }
async collect(count = Number.POSITIVE_INFINITY): Promise<Results<T>> { async collect(count = Number.POSITIVE_INFINITY): Promise<Rows<T>> {
const iter = this[Symbol.asyncIterator](); const iter = this[Symbol.asyncIterator]();
let next; let next;
const rows = []; const rows = [];
@@ -470,8 +459,8 @@ export class Query<T = Row>
return n; return n;
} }
then<S = Results<T>, U = never>( then<S = Rows<T>, U = never>(
f?: ((rows: Results<T>) => S | PromiseLike<S>) | null, f?: ((rows: Rows<T>) => S | PromiseLike<S>) | null,
g?: ((reason?: unknown) => U | PromiseLike<U>) | null g?: ((reason?: unknown) => U | PromiseLike<U>) | null
) { ) {
return this.collect().then(f, g); return this.collect().then(f, g);

12
test.ts
View File

@@ -16,7 +16,7 @@ Deno.test(`integers`, async () => {
await using pg = await connect(); await using pg = await connect();
await using _tx = await pg.begin(); await using _tx = await pg.begin();
const [{ a, b, c }] = await pg.query` const { a, b, c } = await pg.query`
select select
${"0x100"}::int2 as a, ${"0x100"}::int2 as a,
${777}::int4 as b, ${777}::int4 as b,
@@ -32,7 +32,7 @@ Deno.test(`integers`, async () => {
expect(b).toBe(777); expect(b).toBe(777);
expect(c).toBe(1234); expect(c).toBe(1234);
const [{ large }] = const { large } =
await pg.query`select ${"10000000000000000"}::int8 as large`.first(); await pg.query`select ${"10000000000000000"}::int8 as large`.first();
expect(large).toBe(10000000000000000n); expect(large).toBe(10000000000000000n);
@@ -47,7 +47,7 @@ Deno.test(`boolean`, async () => {
await using pg = await connect(); await using pg = await connect();
await using _tx = await pg.begin(); await using _tx = await pg.begin();
const [{ a, b, c }] = await pg.query` const { a, b, c } = await pg.query`
select select
${true}::bool as a, ${true}::bool as a,
${"n"}::bool as b, ${"n"}::bool as b,
@@ -63,7 +63,7 @@ Deno.test(`bytea`, async () => {
await using pg = await connect(); await using pg = await connect();
await using _tx = await pg.begin(); await using _tx = await pg.begin();
const [{ string, array, buffer }] = await pg.query` const { string, array, buffer } = await pg.query`
select select
${"hello, world"}::bytea as string, ${"hello, world"}::bytea as string,
${[1, 2, 3, 4, 5]}::bytea as array, ${[1, 2, 3, 4, 5]}::bytea as array,
@@ -93,7 +93,7 @@ Deno.test(`row`, async () => {
).tag ).tag
).toBe(`COPY 1`); ).toBe(`COPY 1`);
const [row] = await pg.query`select * from my_table`.first(); const row = await pg.query`select * from my_table`.first();
{ {
// columns by name // columns by name
const { a, b, c } = row; const { a, b, c } = row;
@@ -132,7 +132,7 @@ Deno.test(`sql injection`, async () => {
`INSERT 0 1` `INSERT 0 1`
); );
const [{ name }] = await pg.query<{ name: string }>` const { name } = await pg.query<{ name: string }>`
select name from users select name from users
`.first(); `.first();

105
wire.ts
View File

@@ -35,11 +35,11 @@ import {
type EncoderType, type EncoderType,
} from "./ser.ts"; } from "./ser.ts";
import { import {
type CommandResult,
format, format,
is_sql, is_sql,
Query, Query,
type ResultStream, type Result,
type RowStream,
type Row, type Row,
sql, sql,
type SqlFragment, type SqlFragment,
@@ -448,33 +448,34 @@ export interface WireOptions {
readonly password: string; readonly password: string;
readonly database: string | null; readonly database: string | null;
readonly runtime_params: Record<string, string>; readonly runtime_params: Record<string, string>;
readonly reconnect_delay: number;
readonly types: SqlTypeMap; readonly types: SqlTypeMap;
} }
export type WireEvents = { export type WireEvents = {
log(level: LogLevel, ctx: object, msg: string): void; log(level: LogLevel, ctx: object, msg: string): void;
notice(notice: PostgresError): void; notice(notice: PostgresError): void;
parameter(name: string, value: string, prev: string | null): void;
notify(channel: string, payload: string, process_id: number): void; notify(channel: string, payload: string, process_id: number): void;
parameter(name: string, value: string, prev: string | null): void;
close(reason?: unknown): void; close(reason?: unknown): void;
}; };
export interface Transaction extends CommandResult, AsyncDisposable { export interface Transaction extends Result, AsyncDisposable {
readonly open: boolean; readonly open: boolean;
commit(): Promise<CommandResult>; commit(): Promise<Result>;
rollback(): Promise<CommandResult>; rollback(): Promise<Result>;
} }
export type ChannelEvents = { notify: NotificationHandler }; export type ChannelEvents = { notify: NotificationHandler };
export type NotificationHandler = (payload: string, process_id: number) => void; export type NotificationHandler = (payload: string, process_id: number) => void;
export interface Channel export interface Channel
extends TypedEmitter<ChannelEvents>, extends TypedEmitter<ChannelEvents>,
CommandResult, Result,
AsyncDisposable { AsyncDisposable {
readonly name: string; readonly name: string;
readonly open: boolean; readonly open: boolean;
notify(payload: string): Promise<CommandResult>; notify(payload: string): Promise<Result>;
unlisten(): Promise<CommandResult>; unlisten(): Promise<Result>;
} }
export async function wire_connect(options: WireOptions) { export async function wire_connect(options: WireOptions) {
@@ -482,7 +483,10 @@ export async function wire_connect(options: WireOptions) {
return await wire.connect(), wire; return await wire.connect(), wire;
} }
export class Wire extends TypedEmitter<WireEvents> implements Disposable { export class Wire<V extends WireEvents = WireEvents>
extends TypedEmitter<V>
implements Disposable
{
readonly #params; readonly #params;
readonly #connect; readonly #connect;
readonly #query; readonly #query;
@@ -542,16 +546,15 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
} }
async get(param: string) { async get(param: string) {
return ( return await this.query`select current_setting(${param}, true)`
await this.query`select current_setting(${param}, true)`
.map(([s]) => String(s)) .map(([s]) => String(s))
.first_or(null) .first_or(null);
)[0];
} }
async set(param: string, value: string, local = false) { async set(param: string, value: string, local = false) {
return await this return await this.query`select set_config(${param}, ${value}, ${local})`
.query`select set_config(${param}, ${value}, ${local})`.execute(); .map(([s]) => String(s))
.first();
} }
close(reason?: unknown) { close(reason?: unknown) {
@@ -575,7 +578,16 @@ async function socket_connect(hostname: string, port: number) {
function wire_impl( function wire_impl(
wire: Wire, wire: Wire,
{ host, port, user, database, password, runtime_params, types }: WireOptions {
host,
port,
user,
database,
password,
runtime_params,
reconnect_delay,
types,
}: WireOptions
) { ) {
// current runtime parameters as reported by postgres // current runtime parameters as reported by postgres
const params: Parameters = Object.create(null); const params: Parameters = Object.create(null);
@@ -586,6 +598,7 @@ function wire_impl(
// wire supports re-connection; socket and read/write channels are null when closed // wire supports re-connection; socket and read/write channels are null when closed
let connected = false; let connected = false;
let should_reconnect = false;
let socket: Deno.Conn | null = null; let socket: Deno.Conn | null = null;
let read_pop: Receiver<Uint8Array> | null = null; let read_pop: Receiver<Uint8Array> | null = null;
let write_push: Sender<Uint8Array> | null = null; let write_push: Sender<Uint8Array> | null = null;
@@ -625,7 +638,7 @@ function wire_impl(
} catch (e) { } catch (e) {
throw (err = e); throw (err = e);
} finally { } finally {
if (connected) close(err); onclose(err);
} }
} }
@@ -640,6 +653,16 @@ function wire_impl(
return true; return true;
} }
case NotificationResponse.type: {
const { channel, payload, process_id } = ser_decode(
NotificationResponse,
msg
);
wire.emit("notify", channel, payload, process_id);
channels.get(channel)?.emit("notify", payload, process_id);
return true;
}
case ParameterStatus.type: { case ParameterStatus.type: {
const { name, value } = ser_decode(ParameterStatus, msg); const { name, value } = ser_decode(ParameterStatus, msg);
const prev = params[name] ?? null; const prev = params[name] ?? null;
@@ -651,16 +674,6 @@ function wire_impl(
wire.emit("parameter", name, value, prev); wire.emit("parameter", name, value, prev);
return true; return true;
} }
case NotificationResponse.type: {
const { channel, payload, process_id } = ser_decode(
NotificationResponse,
msg
);
wire.emit("notify", channel, payload, process_id);
channels.get(channel)?.emit("notify", payload, process_id);
return true;
}
} }
return false; return false;
@@ -688,7 +701,7 @@ function wire_impl(
} catch (e) { } catch (e) {
throw (err = e); throw (err = e);
} finally { } finally {
if (connected) close(err); onclose(err);
} }
} }
@@ -701,13 +714,26 @@ function wire_impl(
read_pop = channel.receiver((push) => read_socket(s, push)); read_pop = channel.receiver((push) => read_socket(s, push));
write_push = channel.sender((pop) => write_socket(s, pop)); write_push = channel.sender((pop) => write_socket(s, pop));
await handle_auth(); // run auth with rw lock await handle_auth(); // run auth with rw lock
connected = true; (connected = true), (should_reconnect = reconnect_delay !== 0);
} catch (e) { } catch (e) {
throw (close(e), e); throw (close(e), e);
} }
} }
function reconnect() {
connect().catch((err) => {
log("warn", err as Error, `reconnect failed`);
setTimeout(reconnect, reconnect_delay);
});
}
function close(reason?: unknown) { function close(reason?: unknown) {
(should_reconnect = false), onclose(reason);
}
function onclose(reason?: unknown) {
if (!connected) return;
else connected = false;
socket?.close(), (socket = null); socket?.close(), (socket = null);
read_pop?.close(reason), (read_pop = null); read_pop?.close(reason), (read_pop = null);
write_push?.close(reason), (write_push = null); write_push?.close(reason), (write_push = null);
@@ -715,7 +741,8 @@ function wire_impl(
delete (params as Record<string, string>)[name]; delete (params as Record<string, string>)[name];
st_cache.clear(), (st_ids = 0); st_cache.clear(), (st_ids = 0);
(tx_status = "I"), (tx_stack.length = 0); (tx_status = "I"), (tx_stack.length = 0);
connected &&= (wire.emit("close", reason), false); should_reconnect &&= (setTimeout(reconnect, reconnect_delay), false);
wire.emit("close", reason);
} }
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING
@@ -1115,7 +1142,7 @@ function wire_impl(
query: string, query: string,
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): RowStream<Row> {
yield* await pipeline( yield* await pipeline(
() => { () => {
log("debug", { query }, `executing simple query`); log("debug", { query }, `executing simple query`);
@@ -1161,7 +1188,7 @@ function wire_impl(
params: unknown[], params: unknown[],
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): RowStream<Row> {
const { query, name: statement } = st; const { query, name: statement } = st;
const { ser_params, Row } = await st.parse(); const { ser_params, Row } = await st.parse();
const param_values = ser_params(params); const param_values = ser_params(params);
@@ -1210,7 +1237,7 @@ function wire_impl(
chunk_size: number, chunk_size: number,
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): RowStream<Row> {
const { query, name: statement } = st; const { query, name: statement } = st;
const { ser_params, Row } = await st.parse(); const { ser_params, Row } = await st.parse();
const param_values = ser_params(params); const param_values = ser_params(params);
@@ -1298,7 +1325,7 @@ function wire_impl(
return tx_stack.indexOf(this) !== -1; return tx_stack.indexOf(this) !== -1;
} }
constructor(begin: CommandResult) { constructor(begin: Result) {
Object.assign(this, begin); Object.assign(this, begin);
} }
@@ -1356,7 +1383,7 @@ function wire_impl(
return channels.get(this.#name) === this; return channels.get(this.#name) === this;
} }
constructor(name: string, listen: CommandResult) { constructor(name: string, listen: Result) {
super(); super();
Object.assign(this, listen); Object.assign(this, listen);
this.#name = name; this.#name = name;
@@ -1390,7 +1417,7 @@ export type PoolEvents = {
log(level: LogLevel, ctx: object, msg: string): void; log(level: LogLevel, ctx: object, msg: string): void;
}; };
export interface PoolWire extends Wire { export interface PoolWire<V extends WireEvents = WireEvents> extends Wire<V> {
readonly connection_id: number; readonly connection_id: number;
readonly borrowed: boolean; readonly borrowed: boolean;
release(): void; release(): void;
@@ -1400,8 +1427,8 @@ export interface PoolTransaction extends Transaction {
readonly wire: PoolWire; readonly wire: PoolWire;
} }
export class Pool export class Pool<V extends PoolEvents = PoolEvents>
extends TypedEmitter<PoolEvents> extends TypedEmitter<V>
implements PromiseLike<PoolWire>, Disposable implements PromiseLike<PoolWire>, Disposable
{ {
readonly #acquire; readonly #acquire;