8 Commits

7 changed files with 348 additions and 316 deletions

View File

@@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL.
## Installation ## Installation
```ts ```ts
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.1.3/mod.ts"; import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.0/mod.ts";
// ...or from github: // ...or from github:
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.1.3/mod.ts"; import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.0/mod.ts";
``` ```
## Documentation ## Documentation
@@ -25,13 +25,13 @@ TODO: Write the documentation in more detail here.
## Benchmarks ## Benchmarks
Performance is generally on par with [postgres-js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance. Performance is generally on par with [postgres.js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost: Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost:
Query `select * from pg_type`: Query `select * from pg_type`:
```log ```
CPU | Common KVM Processor v2.0 CPU | Common KVM Processor v2.0
Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu) Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu)
@@ -39,78 +39,78 @@ benchmark time/iter (avg) iter/s (min … max) p75
--------------- ----------------------------- --------------------- -------------------------- --------------- ----------------------------- --------------------- --------------------------
group select n=1 group select n=1
pglue 8.3 ms 120.4 ( 7.2 ms … 14.4 ms) 8.5 ms 14.4 ms 14.4 ms pglue 8.8 ms 113.8 ( 7.2 ms … 11.8 ms) 9.7 ms 11.8 ms 11.8 ms
postgres-js 10.8 ms 92.3 ( 8.1 ms … 26.5 ms) 10.7 ms 26.5 ms 26.5 ms postgres.js 10.8 ms 92.3 ( 8.1 ms … 22.0 ms) 11.2 ms 22.0 ms 22.0 ms
deno-postgres 37.1 ms 26.9 ( 33.4 ms … 41.3 ms) 38.5 ms 41.3 ms 41.3 ms deno-postgres 38.9 ms 25.7 ( 23.5 ms … 51.9 ms) 40.3 ms 51.9 ms 51.9 ms
summary summary
pglue pglue
1.30x faster than postgres-js 1.23x faster than postgres.js
4.47x faster than deno-postgres 4.42x faster than deno-postgres
group select n=5 group select n=5
pglue 39.9 ms 25.1 ( 37.2 ms … 49.6 ms) 40.8 ms 49.6 ms 49.6 ms pglue 40.1 ms 25.0 ( 36.1 ms … 48.2 ms) 40.7 ms 48.2 ms 48.2 ms
postgres-js 42.4 ms 23.6 ( 36.5 ms … 61.8 ms) 44.2 ms 61.8 ms 61.8 ms postgres.js 48.7 ms 20.5 ( 38.9 ms … 61.2 ms) 52.7 ms 61.2 ms 61.2 ms
deno-postgres 182.5 ms 5.5 (131.9 ms … 211.8 ms) 193.4 ms 211.8 ms 211.8 ms deno-postgres 184.7 ms 5.4 (166.5 ms … 209.5 ms) 190.7 ms 209.5 ms 209.5 ms
summary summary
pglue pglue
1.06x faster than postgres-js 1.22x faster than postgres.js
4.57x faster than deno-postgres 4.61x faster than deno-postgres
group select n=10 group select n=10
pglue 78.9 ms 12.7 ( 72.3 ms … 88.9 ms) 82.5 ms 88.9 ms 88.9 ms pglue 80.7 ms 12.4 ( 73.5 ms … 95.4 ms) 82.2 ms 95.4 ms 95.4 ms
postgres-js 92.0 ms 10.9 ( 77.6 ms … 113.6 ms) 101.2 ms 113.6 ms 113.6 ms postgres.js 89.1 ms 11.2 ( 82.5 ms … 101.7 ms) 94.4 ms 101.7 ms 101.7 ms
deno-postgres 326.6 ms 3.1 (208.8 ms … 406.0 ms) 388.8 ms 406.0 ms 406.0 ms deno-postgres 375.3 ms 2.7 (327.4 ms … 393.9 ms) 390.7 ms 393.9 ms 393.9 ms
summary summary
pglue pglue
1.17x faster than postgres-js 1.10x faster than postgres.js
4.14x faster than deno-postgres 4.65x faster than deno-postgres
``` ```
Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`: Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`:
```log ```
group insert n=1 group insert n=1
pglue 303.3 µs 3,297 (165.6 µs … 2.4 ms) 321.6 µs 1.1 ms 2.4 ms pglue 259.2 µs 3,858 (165.4 µs … 2.8 ms) 258.0 µs 775.4 µs 2.8 ms
postgres-js 260.4 µs 3,840 (132.9 µs … 2.7 ms) 276.4 µs 1.1 ms 2.7 ms postgres.js 235.9 µs 4,239 (148.8 µs … 1.2 ms) 250.3 µs 577.4 µs 585.6 µs
deno-postgres 281.6 µs 3,552 (186.1 µs … 1.5 ms) 303.8 µs 613.6 µs 791.8 µs deno-postgres 306.7 µs 3,260 (198.8 µs … 1.3 ms) 325.9 µs 1.0 ms 1.3 ms
summary summary
pglue pglue
1.17x slower than postgres-js 1.10x slower than postgres.js
1.08x slower than deno-postgres 1.18x faster than deno-postgres
group insert n=10 group insert n=10
pglue 1.1 ms 878.5 (605.5 µs … 3.2 ms) 1.1 ms 2.2 ms 3.2 ms pglue 789.7 µs 1,266 (553.2 µs … 2.7 ms) 783.4 µs 2.4 ms 2.7 ms
postgres-js 849.3 µs 1,177 (529.5 µs … 10.1 ms) 770.6 µs 3.0 ms 10.1 ms postgres.js 755.6 µs 1,323 (500.5 µs … 3.4 ms) 795.0 µs 2.8 ms 3.4 ms
deno-postgres 2.3 ms 439.4 ( 1.4 ms … 4.9 ms) 2.5 ms 4.1 ms 4.9 ms deno-postgres 2.2 ms 458.1 ( 1.6 ms … 5.2 ms) 2.3 ms 4.8 ms 5.2 ms
summary summary
pglue pglue
1.34x slower than postgres-js 1.04x slower than postgres.js
2.00x faster than deno-postgres 2.76x faster than deno-postgres
group insert n=100 group insert n=100
pglue 8.3 ms 121.0 ( 5.0 ms … 13.6 ms) 9.3 ms 13.6 ms 13.6 ms pglue 5.8 ms 172.0 ( 3.2 ms … 9.9 ms) 6.8 ms 9.9 ms 9.9 ms
postgres-js 13.0 ms 76.7 ( 9.0 ms … 26.9 ms) 14.1 ms 26.9 ms 26.9 ms postgres.js 13.0 ms 76.8 ( 8.6 ms … 20.8 ms) 15.4 ms 20.8 ms 20.8 ms
deno-postgres 19.8 ms 50.5 ( 14.2 ms … 31.8 ms) 22.5 ms 31.8 ms 31.8 ms deno-postgres 18.5 ms 54.1 ( 14.3 ms … 32.1 ms) 20.0 ms 32.1 ms 32.1 ms
summary summary
pglue pglue
1.58x faster than postgres-js 2.24x faster than postgres.js
2.40x faster than deno-postgres 3.18x faster than deno-postgres
group insert n=200 group insert n=200
pglue 15.1 ms 66.2 ( 9.4 ms … 21.1 ms) 16.8 ms 21.1 ms 21.1 ms pglue 8.8 ms 113.4 ( 6.0 ms … 14.1 ms) 10.0 ms 14.1 ms 14.1 ms
postgres-js 27.8 ms 36.0 ( 22.5 ms … 39.2 ms) 30.2 ms 39.2 ms 39.2 ms postgres.js 28.2 ms 35.5 ( 21.1 ms … 47.0 ms) 29.6 ms 47.0 ms 47.0 ms
deno-postgres 40.6 ms 24.6 ( 33.5 ms … 51.4 ms) 42.2 ms 51.4 ms 51.4 ms deno-postgres 37.0 ms 27.0 ( 32.0 ms … 48.1 ms) 39.4 ms 48.1 ms 48.1 ms
summary summary
pglue pglue
1.84x faster than postgres-js 3.20x faster than postgres.js
2.68x faster than deno-postgres 4.20x faster than deno-postgres
``` ```
[1]: https://github.com/porsager/postgres [1]: https://github.com/porsager/postgres

View File

@@ -60,7 +60,7 @@ for (const n of [1, 5, 10]) {
}); });
Deno.bench({ Deno.bench({
name: `postgres-js`, name: `postgres.js`,
group: `select n=${n}`, group: `select n=${n}`,
async fn(b) { async fn(b) {
await bench_select(b, n, () => c_pgjs`select * from pg_type`); await bench_select(b, n, () => c_pgjs`select * from pg_type`);
@@ -95,7 +95,7 @@ for (const n of [1, 10, 100, 200]) {
}); });
Deno.bench({ Deno.bench({
name: `postgres-js`, name: `postgres.js`,
group: `insert n=${n}`, group: `insert n=${n}`,
async fn(b) { async fn(b) {
await c_pgjs`begin`; await c_pgjs`begin`;

View File

@@ -1,5 +1,5 @@
{ {
"name": "@luaneko/pglue", "name": "@luaneko/pglue",
"version": "0.1.3", "version": "0.3.0",
"exports": "./mod.ts" "exports": "./mod.ts"
} }

19
mod.ts
View File

@@ -8,7 +8,7 @@ import {
union, union,
unknown, unknown,
} from "./valita.ts"; } from "./valita.ts";
import { Pool, wire_connect, type LogLevel } from "./wire.ts"; import { Pool, wire_connect } from "./wire.ts";
import { sql_types, type SqlTypeMap } from "./query.ts"; import { sql_types, type SqlTypeMap } from "./query.ts";
export { export {
@@ -27,11 +27,10 @@ export {
sql, sql,
is_sql, is_sql,
Query, Query,
type Row,
type CommandResult,
type Result, type Result,
type Results, type Row,
type ResultStream, type Rows,
type RowStream,
} from "./query.ts"; } from "./query.ts";
export type Options = { export type Options = {
@@ -61,6 +60,7 @@ const ParsedOptions = object({
runtime_params: record(string()).optional(() => ({})), runtime_params: record(string()).optional(() => ({})),
max_connections: number().optional(() => 10), max_connections: number().optional(() => 10),
idle_timeout: number().optional(() => 20), idle_timeout: number().optional(() => 20),
reconnect_delay: number().optional(() => 5),
types: record(unknown()) types: record(unknown())
.optional(() => ({})) .optional(() => ({}))
.map((types): SqlTypeMap => ({ ...sql_types, ...types })), .map((types): SqlTypeMap => ({ ...sql_types, ...types })),
@@ -101,10 +101,6 @@ export function connect(s: string, options: Options = {}) {
postgres.connect = connect; postgres.connect = connect;
export type PostgresEvents = {
log(level: LogLevel, ctx: object, msg: string): void;
};
export class Postgres extends Pool { export class Postgres extends Pool {
readonly #options; readonly #options;
@@ -113,8 +109,9 @@ export class Postgres extends Pool {
this.#options = options; this.#options = options;
} }
async connect() { async connect(options: Options = {}) {
const wire = await wire_connect(this.#options); const opts = ParsedOptions.parse({ ...this.#options, ...options });
const wire = await wire_connect(opts);
return wire.on("log", (l, c, s) => this.emit("log", l, c, s)); return wire.on("log", (l, c, s) => this.emit("log", l, c, s));
} }
} }

View File

@@ -323,22 +323,15 @@ export const sql_types: SqlTypeMap = {
sql.types = sql_types; sql.types = sql_types;
type ReadonlyTuple<T extends readonly unknown[]> = readonly [...T]; export interface Result {
export interface CommandResult {
readonly tag: string; readonly tag: string;
} }
export interface Result<T> extends CommandResult, ReadonlyTuple<[T]> { export interface Rows<T> extends Result, ReadonlyArray<T> {
readonly row: T;
}
export interface Results<T> extends CommandResult, ReadonlyArray<T> {
readonly rows: ReadonlyArray<T>; readonly rows: ReadonlyArray<T>;
} }
export interface ResultStream<T> export interface RowStream<T> extends AsyncIterable<T[], Result, void> {}
extends AsyncIterable<T[], CommandResult, void> {}
export interface Row extends Iterable<unknown, void, void> { export interface Row extends Iterable<unknown, void, void> {
[column: string]: unknown; [column: string]: unknown;
@@ -351,12 +344,10 @@ export interface QueryOptions {
readonly stdout: WritableStream<Uint8Array> | null; readonly stdout: WritableStream<Uint8Array> | null;
} }
export class Query<T = Row> export class Query<T = Row> implements PromiseLike<Rows<T>>, RowStream<T> {
implements PromiseLike<Results<T>>, ResultStream<T>
{
readonly #f; readonly #f;
constructor(f: (options: Partial<QueryOptions>) => ResultStream<T>) { constructor(f: (options: Partial<QueryOptions>) => RowStream<T>) {
this.#f = f; this.#f = f;
} }
@@ -431,20 +422,18 @@ export class Query<T = Row>
return this.#f(options); return this.#f(options);
} }
async first(): Promise<Result<T>> { async first(): Promise<T> {
const { rows, tag } = await this.collect(1); const rows = await this.collect(1);
if (!rows.length) throw new TypeError(`expected one row, got none instead`); if (rows.length !== 0) return rows[0];
const row = rows[0]; else throw new TypeError(`expected one row, got none instead`);
return Object.assign([row] as const, { row: rows[0], tag });
} }
async first_or<S>(value: S): Promise<Result<T | S>> { async first_or<S>(value: S): Promise<T | S> {
const { rows, tag } = await this.collect(1); const rows = await this.collect(1);
const row = rows.length ? rows[0] : value; return rows.length !== 0 ? rows[0] : value;
return Object.assign([row] as const, { row: rows[0], tag });
} }
async collect(count = Number.POSITIVE_INFINITY): Promise<Results<T>> { async collect(count = Number.POSITIVE_INFINITY): Promise<Rows<T>> {
const iter = this[Symbol.asyncIterator](); const iter = this[Symbol.asyncIterator]();
let next; let next;
const rows = []; const rows = [];
@@ -470,8 +459,8 @@ export class Query<T = Row>
return n; return n;
} }
then<S = Results<T>, U = never>( then<S = Rows<T>, U = never>(
f?: ((rows: Results<T>) => S | PromiseLike<S>) | null, f?: ((rows: Rows<T>) => S | PromiseLike<S>) | null,
g?: ((reason?: unknown) => U | PromiseLike<U>) | null g?: ((reason?: unknown) => U | PromiseLike<U>) | null
) { ) {
return this.collect().then(f, g); return this.collect().then(f, g);

12
test.ts
View File

@@ -16,7 +16,7 @@ Deno.test(`integers`, async () => {
await using pg = await connect(); await using pg = await connect();
await using _tx = await pg.begin(); await using _tx = await pg.begin();
const [{ a, b, c }] = await pg.query` const { a, b, c } = await pg.query`
select select
${"0x100"}::int2 as a, ${"0x100"}::int2 as a,
${777}::int4 as b, ${777}::int4 as b,
@@ -32,7 +32,7 @@ Deno.test(`integers`, async () => {
expect(b).toBe(777); expect(b).toBe(777);
expect(c).toBe(1234); expect(c).toBe(1234);
const [{ large }] = const { large } =
await pg.query`select ${"10000000000000000"}::int8 as large`.first(); await pg.query`select ${"10000000000000000"}::int8 as large`.first();
expect(large).toBe(10000000000000000n); expect(large).toBe(10000000000000000n);
@@ -47,7 +47,7 @@ Deno.test(`boolean`, async () => {
await using pg = await connect(); await using pg = await connect();
await using _tx = await pg.begin(); await using _tx = await pg.begin();
const [{ a, b, c }] = await pg.query` const { a, b, c } = await pg.query`
select select
${true}::bool as a, ${true}::bool as a,
${"n"}::bool as b, ${"n"}::bool as b,
@@ -63,7 +63,7 @@ Deno.test(`bytea`, async () => {
await using pg = await connect(); await using pg = await connect();
await using _tx = await pg.begin(); await using _tx = await pg.begin();
const [{ string, array, buffer }] = await pg.query` const { string, array, buffer } = await pg.query`
select select
${"hello, world"}::bytea as string, ${"hello, world"}::bytea as string,
${[1, 2, 3, 4, 5]}::bytea as array, ${[1, 2, 3, 4, 5]}::bytea as array,
@@ -93,7 +93,7 @@ Deno.test(`row`, async () => {
).tag ).tag
).toBe(`COPY 1`); ).toBe(`COPY 1`);
const [row] = await pg.query`select * from my_table`.first(); const row = await pg.query`select * from my_table`.first();
{ {
// columns by name // columns by name
const { a, b, c } = row; const { a, b, c } = row;
@@ -132,7 +132,7 @@ Deno.test(`sql injection`, async () => {
`INSERT 0 1` `INSERT 0 1`
); );
const [{ name }] = await pg.query<{ name: string }>` const { name } = await pg.query<{ name: string }>`
select name from users select name from users
`.first(); `.first();

434
wire.ts
View File

@@ -1,5 +1,6 @@
import { import {
type BinaryLike, type BinaryLike,
buf_concat,
buf_concat_fast, buf_concat_fast,
buf_eq, buf_eq,
buf_xor, buf_xor,
@@ -7,8 +8,9 @@ import {
from_base64, from_base64,
from_utf8, from_utf8,
jit, jit,
type Receiver,
semaphore, semaphore,
semaphore_fast, type Sender,
to_base64, to_base64,
to_utf8, to_utf8,
TypedEmitter, TypedEmitter,
@@ -33,11 +35,11 @@ import {
type EncoderType, type EncoderType,
} from "./ser.ts"; } from "./ser.ts";
import { import {
type CommandResult,
format, format,
is_sql, is_sql,
Query, Query,
type ResultStream, type Result,
type RowStream,
type Row, type Row,
sql, sql,
type SqlFragment, type SqlFragment,
@@ -89,7 +91,7 @@ export class PostgresError extends WireError {
} }
} }
function severity_level(s: string): LogLevel { function severity_log_level(s: string): LogLevel {
switch (s) { switch (s) {
case "DEBUG": case "DEBUG":
return "debug"; return "debug";
@@ -446,87 +448,72 @@ export interface WireOptions {
readonly password: string; readonly password: string;
readonly database: string | null; readonly database: string | null;
readonly runtime_params: Record<string, string>; readonly runtime_params: Record<string, string>;
readonly reconnect_delay: number;
readonly types: SqlTypeMap; readonly types: SqlTypeMap;
} }
export type WireEvents = { export type WireEvents = {
log(level: LogLevel, ctx: object, msg: string): void; log(level: LogLevel, ctx: object, msg: string): void;
notice(notice: PostgresError): void; notice(notice: PostgresError): void;
parameter(name: string, value: string, prev: string | null): void;
notify(channel: string, payload: string, process_id: number): void; notify(channel: string, payload: string, process_id: number): void;
parameter(name: string, value: string, prev: string | null): void;
close(reason?: unknown): void; close(reason?: unknown): void;
}; };
export interface Transaction extends CommandResult, AsyncDisposable { export interface Transaction extends Result, AsyncDisposable {
readonly open: boolean; readonly open: boolean;
commit(): Promise<CommandResult>; commit(): Promise<Result>;
rollback(): Promise<CommandResult>; rollback(): Promise<Result>;
} }
export type ChannelEvents = { notify: NotificationHandler }; export type ChannelEvents = { notify: NotificationHandler };
export type NotificationHandler = (payload: string, process_id: number) => void; export type NotificationHandler = (payload: string, process_id: number) => void;
export interface Channel export interface Channel
extends TypedEmitter<ChannelEvents>, extends TypedEmitter<ChannelEvents>,
CommandResult, Result,
AsyncDisposable { AsyncDisposable {
readonly name: string; readonly name: string;
readonly open: boolean; readonly open: boolean;
notify(payload: string): Promise<CommandResult>; notify(payload: string): Promise<Result>;
unlisten(): Promise<CommandResult>; unlisten(): Promise<Result>;
} }
export async function wire_connect(options: WireOptions) { export async function wire_connect(options: WireOptions) {
const { host, port } = options; const wire = new Wire(options);
const wire = new Wire(await socket_connect(host, port), options); return await wire.connect(), wire;
return await wire.connected, wire;
} }
async function socket_connect(hostname: string, port: number) { export class Wire<V extends WireEvents = WireEvents>
if (hostname.startsWith("/")) { extends TypedEmitter<V>
const path = join(hostname, `.s.PGSQL.${port}`); implements Disposable
return await Deno.connect({ transport: "unix", path }); {
} else {
const socket = await Deno.connect({ transport: "tcp", hostname, port });
return socket.setNoDelay(), socket.setKeepAlive(), socket;
}
}
export class Wire extends TypedEmitter<WireEvents> implements Disposable {
readonly #socket;
readonly #params; readonly #params;
readonly #auth; readonly #connect;
readonly #connected;
readonly #query; readonly #query;
readonly #begin; readonly #begin;
readonly #listen; readonly #listen;
readonly #notify; readonly #notify;
readonly #close; readonly #close;
get socket() {
return this.#socket;
}
get params() { get params() {
return this.#params; return this.#params;
} }
get connected() { constructor(options: WireOptions) {
return this.#connected;
}
constructor(socket: Deno.Conn, options: WireOptions) {
super(); super();
({ ({
params: this.#params, params: this.#params,
auth: this.#auth, connect: this.#connect,
query: this.#query, query: this.#query,
begin: this.#begin, begin: this.#begin,
listen: this.#listen, listen: this.#listen,
notify: this.#notify, notify: this.#notify,
close: this.#close, close: this.#close,
} = wire_impl(this, socket, options)); } = wire_impl(this, options));
this.#socket = socket; }
(this.#connected = this.#auth()).catch(close);
connect() {
return this.#connect();
} }
query<T = Row>(sql: SqlFragment): Query<T>; query<T = Row>(sql: SqlFragment): Query<T>;
@@ -559,16 +546,15 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
} }
async get(param: string) { async get(param: string) {
return ( return await this.query`select current_setting(${param}, true)`
await this.query`select current_setting(${param}, true)`
.map(([s]) => String(s)) .map(([s]) => String(s))
.first_or(null) .first_or(null);
)[0];
} }
async set(param: string, value: string, local = false) { async set(param: string, value: string, local = false) {
return await this return await this.query`select set_config(${param}, ${value}, ${local})`
.query`select set_config(${param}, ${value}, ${local})`.execute(); .map(([s]) => String(s))
.first();
} }
close(reason?: unknown) { close(reason?: unknown) {
@@ -580,59 +566,101 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
} }
} }
const msg_PD = object({ P: Parse, D: Describe }); async function socket_connect(hostname: string, port: number) {
const msg_BE = object({ B: Bind, E: Execute }); if (hostname.startsWith("/")) {
const msg_BEc = object({ B: Bind, E: Execute, c: CopyDone }); const path = join(hostname, `.s.PGSQL.${port}`);
const msg_BEcC = object({ B: Bind, E: Execute, c: CopyDone, C: Close }); return await Deno.connect({ transport: "unix", path });
} else {
const socket = await Deno.connect({ transport: "tcp", hostname, port });
return socket.setNoDelay(), socket.setKeepAlive(), socket;
}
}
function wire_impl( function wire_impl(
wire: Wire, wire: Wire,
socket: Deno.Conn, {
{ user, database, password, runtime_params, types }: WireOptions host,
port,
user,
database,
password,
runtime_params,
reconnect_delay,
types,
}: WireOptions
) { ) {
// current runtime parameters as reported by postgres
const params: Parameters = Object.create(null); const params: Parameters = Object.create(null);
function log(level: LogLevel, ctx: object, msg: string) { function log(level: LogLevel, ctx: object, msg: string) {
wire.emit("log", level, ctx, msg); wire.emit("log", level, ctx, msg);
} }
// wire supports re-connection; socket and read/write channels are null when closed
let connected = false;
let should_reconnect = false;
let socket: Deno.Conn | null = null;
let read_pop: Receiver<Uint8Array> | null = null;
let write_push: Sender<Uint8Array> | null = null;
async function read<T>(type: Encoder<T>) { async function read<T>(type: Encoder<T>) {
const msg = await read_recv(); const msg = read_pop !== null ? await read_pop() : null;
if (msg === null) throw new WireError(`connection closed`); if (msg !== null) return ser_decode(type, msg_check_err(msg));
else return ser_decode(type, msg_check_err(msg)); else throw new WireError(`connection closed`);
} }
async function read_raw() { async function read_msg() {
const msg = await read_recv(); const msg = read_pop !== null ? await read_pop() : null;
if (msg === null) throw new WireError(`connection closed`); if (msg !== null) return msg;
else return msg; else throw new WireError(`connection closed`);
} }
async function* read_socket() { async function read_socket(socket: Deno.Conn, push: Sender<Uint8Array>) {
const buf = new Uint8Array(64 * 1024); let err;
for (let n; (n = await socket.read(buf)) !== null; )
yield buf.subarray(0, n);
}
const read_recv = channel.receiver<Uint8Array>(async function read(send) {
let err: unknown;
try { try {
let buf = new Uint8Array(); const header_size = 5;
for await (const chunk of read_socket()) { const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads
buf = buf_concat_fast(buf, chunk); let buf = new Uint8Array(); // concatenated messages read so far
for (let n; (n = ser_decode(Header, buf).length + 1) <= buf.length; ) { for (let read; (read = await socket.read(read_buf)) !== null; ) {
const msg = buf.subarray(0, n); buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf
buf = buf.subarray(n); while (buf.length >= header_size) {
const size = ser_decode(Header, buf).length + 1;
if (buf.length < size) break;
const msg = buf.subarray(0, size); // shift one message from buf
buf = buf.subarray(size);
if (!handle_msg(msg)) push(msg);
}
}
// there should be nothing left in buf if we gracefully exited
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
} catch (e) {
throw (err = e);
} finally {
onclose(err);
}
}
function handle_msg(msg: Uint8Array) {
switch (msg_type(msg)) { switch (msg_type(msg)) {
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-ASYNC // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-ASYNC
case NoticeResponse.type: { case NoticeResponse.type: {
const { fields } = ser_decode(NoticeResponse, msg); const { fields } = ser_decode(NoticeResponse, msg);
const notice = new PostgresError(fields); const notice = new PostgresError(fields);
log(severity_level(notice.severity), notice, notice.message); log(severity_log_level(notice.severity), notice, notice.message);
wire.emit("notice", notice); wire.emit("notice", notice);
continue; return true;
}
case NotificationResponse.type: {
const { channel, payload, process_id } = ser_decode(
NotificationResponse,
msg
);
wire.emit("notify", channel, payload, process_id);
channels.get(channel)?.emit("notify", payload, process_id);
return true;
} }
case ParameterStatus.type: { case ParameterStatus.type: {
@@ -644,48 +672,82 @@ function wire_impl(
value, value,
}); });
wire.emit("parameter", name, value, prev); wire.emit("parameter", name, value, prev);
continue; return true;
}
case NotificationResponse.type: {
const { channel, payload, process_id } = ser_decode(
NotificationResponse,
msg
);
wire.emit("notify", channel, payload, process_id);
channels.get(channel)?.emit("notify", payload, process_id);
continue;
} }
} }
send(msg); return false;
} }
}
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
} catch (e) {
throw ((err = e), e);
} finally {
wire.emit("close", err);
}
});
function write<T>(type: Encoder<T>, value: T) { function write<T>(type: Encoder<T>, value: T) {
return write_raw(ser_encode(type, value)); write_msg(ser_encode(type, value));
} }
async function write_raw(buf: Uint8Array) { function write_msg(buf: Uint8Array) {
if (write_push !== null) write_push(buf);
else throw new WireError(`connection closed`);
}
async function write_socket(socket: Deno.Conn, pop: Receiver<Uint8Array>) {
let err;
try {
for (let buf; (buf = await pop()) !== null; ) {
const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any
for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf;
if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls
for (let i = 0, n = buf.length; i < n; ) for (let i = 0, n = buf.length; i < n; )
i += await socket.write(buf.subarray(i)); i += await socket.write(buf.subarray(i));
} }
} catch (e) {
throw (err = e);
} finally {
onclose(err);
}
}
async function connect() {
using _rlock = await rlock();
using _wlock = await wlock();
if (connected) return;
try {
const s = (socket = await socket_connect(host, port));
read_pop = channel.receiver((push) => read_socket(s, push));
write_push = channel.sender((pop) => write_socket(s, pop));
await handle_auth(); // run auth with rw lock
(connected = true), (should_reconnect = reconnect_delay !== 0);
} catch (e) {
throw (close(e), e);
}
}
function reconnect() {
connect().catch((err) => {
log("warn", err as Error, `reconnect failed`);
setTimeout(reconnect, reconnect_delay);
});
}
function close(reason?: unknown) { function close(reason?: unknown) {
socket.close(), read_recv.close(reason); (should_reconnect = false), onclose(reason);
}
function onclose(reason?: unknown) {
if (!connected) return;
else connected = false;
socket?.close(), (socket = null);
read_pop?.close(reason), (read_pop = null);
write_push?.close(reason), (write_push = null);
for (const name of Object.keys(params))
delete (params as Record<string, string>)[name];
st_cache.clear(), (st_ids = 0);
(tx_status = "I"), (tx_stack.length = 0);
should_reconnect &&= (setTimeout(reconnect, reconnect_delay), false);
wire.emit("close", reason);
} }
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING
const rlock = semaphore_fast(); const rlock = semaphore();
const wlock = semaphore_fast(); const wlock = semaphore();
function pipeline<T>( function pipeline<T>(
w: () => void | PromiseLike<void>, w: () => void | PromiseLike<void>,
@@ -697,39 +759,38 @@ function wire_impl(
}); });
} }
function pipeline_read<T>(r: () => T | PromiseLike<T>) { async function pipeline_read<T>(r: () => T | PromiseLike<T>) {
return rlock(async function pipeline_read() { using _lock = await rlock();
try { try {
return await r(); return await r();
} finally { } finally {
try { try {
let msg; let msg;
while (msg_type((msg = await read_raw())) !== ReadyForQuery.type); while (msg_type((msg = await read_msg())) !== ReadyForQuery.type);
({ tx_status } = ser_decode(ReadyForQuery, msg)); ({ tx_status } = ser_decode(ReadyForQuery, msg));
} catch { } catch {
// ignored // ignored
} }
} }
});
} }
function pipeline_write<T>(w: () => T | PromiseLike<T>) { async function pipeline_write<T>(w: () => T | PromiseLike<T>) {
return wlock(async function pipeline_write() { using _lock = await wlock();
try { try {
return await w(); return await w();
} finally { } finally {
try { try {
await write(Sync, {}); write(Sync, {});
} catch { } catch {
// ignored // ignored
} }
} }
});
} }
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-START-UP // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-START-UP
async function auth() { async function handle_auth() {
await write(StartupMessage, { // always run within rw lock (see connect())
write(StartupMessage, {
version: 196608, version: 196608,
params: { params: {
application_name: "pglue", application_name: "pglue",
@@ -744,7 +805,7 @@ function wire_impl(
}); });
auth: for (;;) { auth: for (;;) {
const msg = msg_check_err(await read_raw()); const msg = msg_check_err(await read_msg());
switch (msg_type(msg)) { switch (msg_type(msg)) {
case NegotiateProtocolVersion.type: { case NegotiateProtocolVersion.type: {
const { bad_options } = ser_decode(NegotiateProtocolVersion, msg); const { bad_options } = ser_decode(NegotiateProtocolVersion, msg);
@@ -762,7 +823,7 @@ function wire_impl(
throw new WireError(`kerberos authentication is deprecated`); throw new WireError(`kerberos authentication is deprecated`);
case 3: // AuthenticationCleartextPassword case 3: // AuthenticationCleartextPassword
await write(PasswordMessage, { password }); write(PasswordMessage, { password });
continue; continue;
case 5: // AuthenticationMD5Password case 5: // AuthenticationMD5Password
@@ -778,7 +839,7 @@ function wire_impl(
// AuthenticationSASL // AuthenticationSASL
case 10: case 10:
await auth_sasl(); await handle_auth_sasl();
continue; continue;
default: default:
@@ -786,8 +847,9 @@ function wire_impl(
} }
} }
// wait for ready
ready: for (;;) { ready: for (;;) {
const msg = msg_check_err(await read_raw()); const msg = msg_check_err(await read_msg());
switch (msg_type(msg)) { switch (msg_type(msg)) {
case BackendKeyData.type: case BackendKeyData.type:
continue; // ignored continue; // ignored
@@ -797,11 +859,18 @@ function wire_impl(
break ready; break ready;
} }
} }
// re-listen previously registered channels
await Promise.all(
channels
.keys()
.map((name) => query(sql`listen ${sql.ident(name)}`).execute())
);
} }
// https://www.postgresql.org/docs/current/sasl-authentication.html#SASL-SCRAM-SHA-256 // https://www.postgresql.org/docs/current/sasl-authentication.html#SASL-SCRAM-SHA-256
// https://datatracker.ietf.org/doc/html/rfc5802 // https://datatracker.ietf.org/doc/html/rfc5802
async function auth_sasl() { async function handle_auth_sasl() {
const bits = 256; const bits = 256;
const hash = `SHA-${bits}`; const hash = `SHA-${bits}`;
const mechanism = `SCRAM-${hash}`; const mechanism = `SCRAM-${hash}`;
@@ -858,7 +927,7 @@ function wire_impl(
)}`; )}`;
const client_first_message_bare = `${username},${initial_nonce}`; const client_first_message_bare = `${username},${initial_nonce}`;
const client_first_message = `${gs2_header}${client_first_message_bare}`; const client_first_message = `${gs2_header}${client_first_message_bare}`;
await write(SASLInitialResponse, { mechanism, data: client_first_message }); write(SASLInitialResponse, { mechanism, data: client_first_message });
const server_first_message_str = from_utf8( const server_first_message_str = from_utf8(
(await read(AuthenticationSASLContinue)).data (await read(AuthenticationSASLContinue)).data
@@ -877,7 +946,7 @@ function wire_impl(
const client_proof = buf_xor(client_key, client_signature); const client_proof = buf_xor(client_key, client_signature);
const proof = `p=${to_base64(client_proof)}`; const proof = `p=${to_base64(client_proof)}`;
const client_final_message = `${client_final_message_without_proof},${proof}`; const client_final_message = `${client_final_message_without_proof},${proof}`;
await write(SASLResponse, { data: client_final_message }); write(SASLResponse, { data: client_final_message });
const server_key = await hmac(salted_password, "Server Key"); const server_key = await hmac(salted_password, "Server Key");
const server_signature = await hmac(server_key, auth_message); const server_signature = await hmac(server_key, auth_message);
@@ -897,29 +966,28 @@ function wire_impl(
readonly name = `__st${st_ids++}`; readonly name = `__st${st_ids++}`;
constructor(readonly query: string) {} constructor(readonly query: string) {}
parse_task: Promise<{ #parse_task: Promise<{
ser_params: ParameterSerializer; ser_params: ParameterSerializer;
Row: RowConstructor; Row: RowConstructor;
}> | null = null; }> | null = null;
parse() { parse() {
return (this.parse_task ??= this.#parse()); return (this.#parse_task ??= this.#parse());
} }
async #parse() { async #parse() {
try { try {
const { name, query } = this; const { name, query } = this;
return await pipeline( return await pipeline(
() => () => {
write(msg_PD, { write(Parse, { statement: name, query, param_types: [] });
P: { statement: name, query, param_types: [] }, write(Describe, { which: "S", name });
D: { which: "S", name }, },
}),
async () => { async () => {
await read(ParseComplete); await read(ParseComplete);
const ser_params = make_param_ser(await read(ParameterDescription)); const ser_params = make_param_ser(await read(ParameterDescription));
const msg = msg_check_err(await read_raw()); const msg = msg_check_err(await read_msg());
const Row = const Row =
msg_type(msg) === NoData.type msg_type(msg) === NoData.type
? EmptyRow ? EmptyRow
@@ -929,13 +997,13 @@ function wire_impl(
} }
); );
} catch (e) { } catch (e) {
throw ((this.parse_task = null), e); throw ((this.#parse_task = null), e);
} }
} }
portals = 0; #portals = 0;
portal() { portal() {
return `${this.name}_${this.portals++}`; return `${this.name}_${this.#portals++}`;
} }
} }
@@ -944,6 +1012,7 @@ function wire_impl(
(params: unknown[]): (string | null)[]; (params: unknown[]): (string | null)[];
} }
// makes function to serialize query parameters
function make_param_ser({ param_types }: ParameterDescription) { function make_param_ser({ param_types }: ParameterDescription) {
return jit.compiled<ParameterSerializer>`function ser_params(xs) { return jit.compiled<ParameterSerializer>`function ser_params(xs) {
return [ return [
@@ -960,6 +1029,7 @@ function wire_impl(
new (columns: (BinaryLike | null)[]): Row; new (columns: (BinaryLike | null)[]): Row;
} }
// makes function to create Row objects
const EmptyRow = make_row_ctor({ columns: [] }); const EmptyRow = make_row_ctor({ columns: [] });
function make_row_ctor({ columns }: RowDescription) { function make_row_ctor({ columns }: RowDescription) {
const Row = jit.compiled<RowConstructor>`function Row(xs) { const Row = jit.compiled<RowConstructor>`function Row(xs) {
@@ -998,7 +1068,7 @@ function wire_impl(
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
) { ) {
for (let rows = [], i = 0; ; ) { for (let rows = [], i = 0; ; ) {
const msg = msg_check_err(await read_raw()); const msg = msg_check_err(await read_msg());
switch (msg_type(msg)) { switch (msg_type(msg)) {
default: default:
case DataRow.type: case DataRow.type:
@@ -1034,7 +1104,7 @@ function wire_impl(
if (stream !== null) { if (stream !== null) {
const writer = stream.getWriter(); const writer = stream.getWriter();
try { try {
for (let msg; msg_type((msg = await read_raw())) !== CopyDone.type; ) { for (let msg; msg_type((msg = await read_msg())) !== CopyDone.type; ) {
const { data } = ser_decode(CopyData, msg_check_err(msg)); const { data } = ser_decode(CopyData, msg_check_err(msg));
await writer.write(to_utf8(data)); await writer.write(to_utf8(data));
} }
@@ -1046,7 +1116,7 @@ function wire_impl(
writer.releaseLock(); writer.releaseLock();
} }
} else { } else {
while (msg_type(msg_check_err(await read_raw())) !== CopyDone.type); while (msg_type(msg_check_err(await read_msg())) !== CopyDone.type);
} }
} }
@@ -1055,16 +1125,16 @@ function wire_impl(
const reader = stream.getReader(); const reader = stream.getReader();
try { try {
for (let next; !(next = await reader.read()).done; ) for (let next; !(next = await reader.read()).done; )
await write(CopyData, { data: next.value }); write(CopyData, { data: next.value });
await write(CopyDone, {}); write(CopyDone, {});
} catch (e) { } catch (e) {
await write(CopyFail, { cause: String(e) }); write(CopyFail, { cause: String(e) });
throw e; throw e;
} finally { } finally {
reader.releaseLock(); reader.releaseLock();
} }
} else { } else {
await write(CopyDone, {}); write(CopyDone, {});
} }
} }
@@ -1072,21 +1142,21 @@ function wire_impl(
query: string, query: string,
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): RowStream<Row> {
log("debug", { query: query }, `executing simple query`); yield* await pipeline(
() => {
const { chunks, err } = await pipeline( log("debug", { query }, `executing simple query`);
async () => { write(QueryMessage, { query });
await write(QueryMessage, { query }); write_copy_in(stdin);
return write_copy_in(stdin);
}, },
async () => { async () => {
for (let chunks = [], err; ; ) { for (let chunks = [], err; ; ) {
const msg = await read_raw(); const msg = await read_msg();
switch (msg_type(msg)) { switch (msg_type(msg)) {
default: default:
case ReadyForQuery.type: case ReadyForQuery.type:
return { chunks, err }; if (err) throw err;
else return chunks;
case RowDescription.type: { case RowDescription.type: {
const Row = make_row_ctor(ser_decode(RowDescription, msg)); const Row = make_row_ctor(ser_decode(RowDescription, msg));
@@ -1110,8 +1180,6 @@ function wire_impl(
} }
); );
yield* chunks;
if (err) throw err;
return { tag: "" }; return { tag: "" };
} }
@@ -1120,13 +1188,8 @@ function wire_impl(
params: unknown[], params: unknown[],
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): RowStream<Row> {
log( const { query, name: statement } = st;
"debug",
{ query: st.query, statement: st.name, params },
`executing query`
);
const { ser_params, Row } = await st.parse(); const { ser_params, Row } = await st.parse();
const param_values = ser_params(params); const param_values = ser_params(params);
const portal = st.portal(); const portal = st.portal();
@@ -1134,23 +1197,17 @@ function wire_impl(
try { try {
const { rows, tag } = await pipeline( const { rows, tag } = await pipeline(
async () => { async () => {
const B = { log("debug", { query, statement, params }, `executing query`);
write(Bind, {
portal, portal,
statement: st.name, statement: st.name,
param_formats: [], param_formats: [],
param_values, param_values,
column_formats: [], column_formats: [],
}; });
const E = { portal, row_limit: 0 }; write(Execute, { portal, row_limit: 0 });
const C = { which: "P" as const, name: portal };
if (stdin !== null) {
await write(msg_BE, { B, E });
await write_copy_in(stdin); await write_copy_in(stdin);
return write(Close, C); write(Close, { which: "P", name: portal });
} else {
return write(msg_BEcC, { B, E, c: {}, C });
}
}, },
async () => { async () => {
await read(BindComplete); await read(BindComplete);
@@ -1163,7 +1220,7 @@ function wire_impl(
} catch (e) { } catch (e) {
try { try {
await pipeline( await pipeline(
() => write(Close, { which: "P" as const, name: portal }), () => write(Close, { which: "P", name: portal }),
() => read(CloseComplete) () => read(CloseComplete)
); );
} catch { } catch {
@@ -1180,35 +1237,25 @@ function wire_impl(
chunk_size: number, chunk_size: number,
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): RowStream<Row> {
log( const { query, name: statement } = st;
"debug",
{ query: st.query, statement: st.name, params },
`executing chunked query`
);
const { ser_params, Row } = await st.parse(); const { ser_params, Row } = await st.parse();
const param_values = ser_params(params); const param_values = ser_params(params);
const portal = st.portal(); const portal = st.portal();
try { try {
let { done, rows, tag } = await pipeline( let { done, rows, tag } = await pipeline(
async () => { () => {
const B = { log("debug", { query, statement, params }, `executing chunked query`);
write(Bind, {
portal, portal,
statement: st.name, statement: st.name,
param_formats: [], param_formats: [],
param_values, param_values,
column_formats: [], column_formats: [],
}; });
const E = { portal, row_limit: chunk_size }; write(Execute, { portal, row_limit: chunk_size });
if (stdin !== null) {
await write(msg_BE, { B, E });
return write_copy_in(stdin); return write_copy_in(stdin);
} else {
return write(msg_BEc, { B, E, c: {} });
}
}, },
async () => { async () => {
await read(BindComplete); await read(BindComplete);
@@ -1230,7 +1277,7 @@ function wire_impl(
return { tag }; return { tag };
} finally { } finally {
await pipeline( await pipeline(
() => write(Close, { which: "P" as const, name: portal }), () => write(Close, { which: "P", name: portal }),
() => read(CloseComplete) () => read(CloseComplete)
); );
} }
@@ -1278,7 +1325,7 @@ function wire_impl(
return tx_stack.indexOf(this) !== -1; return tx_stack.indexOf(this) !== -1;
} }
constructor(begin: CommandResult) { constructor(begin: Result) {
Object.assign(this, begin); Object.assign(this, begin);
} }
@@ -1336,7 +1383,7 @@ function wire_impl(
return channels.get(this.#name) === this; return channels.get(this.#name) === this;
} }
constructor(name: string, listen: CommandResult) { constructor(name: string, listen: Result) {
super(); super();
Object.assign(this, listen); Object.assign(this, listen);
this.#name = name; this.#name = name;
@@ -1358,7 +1405,7 @@ function wire_impl(
} }
}; };
return { params, auth, query, begin, listen, notify, close }; return { params, connect, query, begin, listen, notify, close };
} }
export interface PoolOptions extends WireOptions { export interface PoolOptions extends WireOptions {
@@ -1370,7 +1417,7 @@ export type PoolEvents = {
log(level: LogLevel, ctx: object, msg: string): void; log(level: LogLevel, ctx: object, msg: string): void;
}; };
export interface PoolWire extends Wire { export interface PoolWire<V extends WireEvents = WireEvents> extends Wire<V> {
readonly connection_id: number; readonly connection_id: number;
readonly borrowed: boolean; readonly borrowed: boolean;
release(): void; release(): void;
@@ -1380,8 +1427,8 @@ export interface PoolTransaction extends Transaction {
readonly wire: PoolWire; readonly wire: PoolWire;
} }
export class Pool export class Pool<V extends PoolEvents = PoolEvents>
extends TypedEmitter<PoolEvents> extends TypedEmitter<V>
implements PromiseLike<PoolWire>, Disposable implements PromiseLike<PoolWire>, Disposable
{ {
readonly #acquire; readonly #acquire;
@@ -1517,9 +1564,8 @@ function pool_impl(
}; };
async function connect() { async function connect() {
const { host, port } = options; const wire = new PoolWire(options);
const wire = new PoolWire(await socket_connect(host, port), options); await wire.connect(), all.add(wire);
await wire.connected, all.add(wire);
const { connection_id } = wire; const { connection_id } = wire;
return wire return wire
.on("log", (l, c, s) => pool.emit("log", l, { ...c, connection_id }, s)) .on("log", (l, c, s) => pool.emit("log", l, { ...c, connection_id }, s))