20 Commits

Author SHA1 Message Date
a4c0055c79 Return row value direct in .first() and .first_or(x) 2025-01-12 01:25:55 +11:00
00002525e4 Implement wire automatic reconnection 2025-01-12 01:09:34 +11:00
328cc63536 Update benchmark results 2025-01-12 00:26:38 +11:00
29b79f25c0 Fix close event not firing on wire 2025-01-12 00:23:26 +11:00
29b2796627 Log query inside pipeline handler 2025-01-11 07:24:23 +11:00
02f8098811 Code quality check 2025-01-11 07:21:44 +11:00
da7f7e12f3 Implement wire reconnect support 2025-01-11 06:02:32 +11:00
6f9e9770cf Ensure pipeline is locked during authentication phase 2025-01-11 02:43:42 +11:00
137422601b Implement simple query support 2025-01-11 02:00:05 +11:00
3793e14f50 Add v prefix to all version tags 2025-01-11 00:53:45 +11:00
b194397645 Optimise query request write 2025-01-11 00:46:17 +11:00
858b7a95f3 Add streaming query testing code 2025-01-11 00:39:00 +11:00
a88da00dec Update version tag in readme 2025-01-11 00:28:51 +11:00
cefe14b9dc Ensure stdout writable stream is always closed 2025-01-11 00:28:14 +11:00
4e68e34fd0 Add more testing code 2025-01-11 00:15:19 +11:00
826190ecc9 Add github import url 2025-01-10 20:35:32 +11:00
72749e5841 Update readme 2025-01-10 20:32:06 +11:00
b9829bc70d Update readme 2025-01-10 19:56:16 +11:00
9eecf29bc5 Update readme 2025-01-10 19:53:04 +11:00
8964cb342e Update benchmarks 2025-01-10 19:32:41 +11:00
10 changed files with 757 additions and 347 deletions

121
README.md
View File

@@ -1,14 +1,35 @@
# pglue
## Performance
The glue for TypeScript to PostgreSQL.
pglue implements automatic query pipelining which makes it especially performant with many queries concurrently executed on a single connection.
## Overview
- 🌟 [High performance](#benchmarks), fully asynchronous, written in modern TypeScript
- 🐢 First class Deno support
- 💬 Automatic query parameterisation
- 🌧️ Automatic query pipelining
- 📣 Listen/notify support
- 📤 Connection pool support
## Installation
```ts
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.3.0/mod.ts";
// ...or from github:
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.3.0/mod.ts";
```
## Documentation
TODO: Write the documentation in more detail here.
## Benchmarks
Performance is generally on par with [postgres.js][1] and up to **4x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
Performance is generally on par with [postgres.js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
Tested on a 4 core 2800 MHz x86_64-pc-linux-gnu QEMU VM with Deno 2.1.4 and local PostgreSQL 17.1 installation connected via TCP on localhost:
Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost:
Query `select * from pg_type`:
```
CPU | Common KVM Processor v2.0
@@ -18,74 +39,78 @@ benchmark time/iter (avg) iter/s (min … max) p75
--------------- ----------------------------- --------------------- --------------------------
group select n=1
pglue 9.9 ms 101.1 ( 7.9 ms … 17.8 ms) 10.2 ms 17.8 ms 17.8 ms
postgres.js 8.8 ms 114.2 ( 7.0 ms … 9.5 ms) 9.1 ms 9.5 ms 9.5 ms
deno-postgres 37.4 ms 26.7 ( 25.3 ms … 42.8 ms) 39.2 ms 42.8 ms 42.8 ms
pglue 8.8 ms 113.8 ( 7.2 ms … 11.8 ms) 9.7 ms 11.8 ms 11.8 ms
postgres.js 10.8 ms 92.3 ( 8.1 ms … 22.0 ms) 11.2 ms 22.0 ms 22.0 ms
deno-postgres 38.9 ms 25.7 ( 23.5 ms … 51.9 ms) 40.3 ms 51.9 ms 51.9 ms
summary
pglue
1.13x slower than postgres.js
3.78x faster than deno-postgres
1.23x faster than postgres.js
4.42x faster than deno-postgres
group select n=5
pglue 48.2 ms 20.8 ( 41.9 ms … 68.5 ms) 50.3 ms 68.5 ms 68.5 ms
postgres.js 43.6 ms 22.9 ( 38.1 ms … 57.3 ms) 48.6 ms 57.3 ms 57.3 ms
deno-postgres 186.5 ms 5.4 (138.4 ms … 213.2 ms) 193.6 ms 213.2 ms 213.2 ms
pglue 40.1 ms 25.0 ( 36.1 ms … 48.2 ms) 40.7 ms 48.2 ms 48.2 ms
postgres.js 48.7 ms 20.5 ( 38.9 ms … 61.2 ms) 52.7 ms 61.2 ms 61.2 ms
deno-postgres 184.7 ms 5.4 (166.5 ms … 209.5 ms) 190.7 ms 209.5 ms 209.5 ms
summary
pglue
1.11x slower than postgres.js
3.87x faster than deno-postgres
1.22x faster than postgres.js
4.61x faster than deno-postgres
group select n=10
pglue 97.8 ms 10.2 ( 90.2 ms … 105.0 ms) 104.0 ms 105.0 ms 105.0 ms
postgres.js 93.8 ms 10.7 ( 80.9 ms … 107.7 ms) 106.1 ms 107.7 ms 107.7 ms
deno-postgres 333.9 ms 3.0 (205.6 ms … 394.9 ms) 377.4 ms 394.9 ms 394.9 ms
pglue 80.7 ms 12.4 ( 73.5 ms … 95.4 ms) 82.2 ms 95.4 ms 95.4 ms
postgres.js 89.1 ms 11.2 ( 82.5 ms … 101.7 ms) 94.4 ms 101.7 ms 101.7 ms
deno-postgres 375.3 ms 2.7 (327.4 ms … 393.9 ms) 390.7 ms 393.9 ms 393.9 ms
summary
pglue
1.10x faster than postgres.js
4.65x faster than deno-postgres
```
Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`:
```
group insert n=1
pglue 259.2 µs 3,858 (165.4 µs … 2.8 ms) 258.0 µs 775.4 µs 2.8 ms
postgres.js 235.9 µs 4,239 (148.8 µs … 1.2 ms) 250.3 µs 577.4 µs 585.6 µs
deno-postgres 306.7 µs 3,260 (198.8 µs … 1.3 ms) 325.9 µs 1.0 ms 1.3 ms
summary
pglue
1.10x slower than postgres.js
1.18x faster than deno-postgres
group insert n=10
pglue 789.7 µs 1,266 (553.2 µs … 2.7 ms) 783.4 µs 2.4 ms 2.7 ms
postgres.js 755.6 µs 1,323 (500.5 µs … 3.4 ms) 795.0 µs 2.8 ms 3.4 ms
deno-postgres 2.2 ms 458.1 ( 1.6 ms … 5.2 ms) 2.3 ms 4.8 ms 5.2 ms
summary
pglue
1.04x slower than postgres.js
3.42x faster than deno-postgres
group insert n=1
pglue 237.5 µs 4,210 (143.9 µs … 1.3 ms) 249.2 µs 953.3 µs 1.3 ms
postgres.js 242.5 µs 4,124 (137.4 µs … 886.4 µs) 263.4 µs 762.8 µs 865.5 µs
deno-postgres 295.1 µs 3,389 (163.8 µs … 899.3 µs) 340.0 µs 641.7 µs 899.3 µs
summary
pglue
1.02x faster than postgres.js
1.24x faster than deno-postgres
group insert n=10
pglue 1.1 ms 869.6 (610.1 µs … 2.1 ms) 1.2 ms 2.0 ms 2.1 ms
postgres.js 755.9 µs 1,323 (387.6 µs … 4.7 ms) 805.4 µs 2.8 ms 4.7 ms
deno-postgres 2.3 ms 434.4 ( 1.6 ms … 10.6 ms) 2.4 ms 6.5 ms 10.6 ms
summary
pglue
1.52x slower than postgres.js
2.00x faster than deno-postgres
2.76x faster than deno-postgres
group insert n=100
pglue 9.2 ms 109.0 ( 5.5 ms … 15.6 ms) 10.4 ms 15.6 ms 15.6 ms
postgres.js 14.8 ms 67.4 ( 9.6 ms … 35.8 ms) 16.6 ms 35.8 ms 35.8 ms
deno-postgres 18.8 ms 53.1 ( 14.5 ms … 25.8 ms) 20.9 ms 25.8 ms 25.8 ms
pglue 5.8 ms 172.0 ( 3.2 ms … 9.9 ms) 6.8 ms 9.9 ms 9.9 ms
postgres.js 13.0 ms 76.8 ( 8.6 ms … 20.8 ms) 15.4 ms 20.8 ms 20.8 ms
deno-postgres 18.5 ms 54.1 ( 14.3 ms … 32.1 ms) 20.0 ms 32.1 ms 32.1 ms
summary
pglue
1.62x faster than postgres.js
2.05x faster than deno-postgres
2.24x faster than postgres.js
3.18x faster than deno-postgres
group insert n=200
pglue 15.0 ms 66.6 ( 11.1 ms … 19.0 ms) 16.7 ms 19.0 ms 19.0 ms
postgres.js 28.1 ms 35.6 ( 22.8 ms … 40.0 ms) 29.1 ms 40.0 ms 40.0 ms
deno-postgres 35.9 ms 27.9 ( 29.7 ms … 46.5 ms) 37.2 ms 46.5 ms 46.5 ms
pglue 8.8 ms 113.4 ( 6.0 ms … 14.1 ms) 10.0 ms 14.1 ms 14.1 ms
postgres.js 28.2 ms 35.5 ( 21.1 ms … 47.0 ms) 29.6 ms 47.0 ms 47.0 ms
deno-postgres 37.0 ms 27.0 ( 32.0 ms … 48.1 ms) 39.4 ms 48.1 ms 48.1 ms
summary
pglue
1.87x faster than postgres.js
2.39x faster than deno-postgres
3.20x faster than postgres.js
4.20x faster than deno-postgres
```
[1]: https://github.com/porsager/postgres

View File

@@ -1,4 +1,4 @@
import * as pglue from "./mod.ts";
import pglue from "./mod.ts";
import postgres_js from "https://deno.land/x/postgresjs/mod.js";
import * as deno_postgres from "https://deno.land/x/postgres/mod.ts";

View File

@@ -1,9 +1,5 @@
{
"name": "@luaneko/pglue",
"version": "0.1.0",
"exports": "./mod.ts",
"tasks": {
"test": "deno run --watch -A mod_test.ts",
"bench": "deno bench --watch -A"
}
"version": "0.3.0",
"exports": "./mod.ts"
}

39
deno.lock generated
View File

@@ -2,23 +2,50 @@
"version": "4",
"specifiers": {
"jsr:@badrap/valita@~0.4.2": "0.4.2",
"jsr:@std/assert@^1.0.10": "1.0.10",
"jsr:@std/bytes@^1.0.3": "1.0.4",
"jsr:@std/bytes@^1.0.4": "1.0.4",
"jsr:@std/encoding@^1.0.6": "1.0.6",
"jsr:@std/expect@*": "1.0.11",
"jsr:@std/internal@^1.0.5": "1.0.5",
"jsr:@std/path@^1.0.8": "1.0.8",
"jsr:@std/streams@*": "1.0.8",
"npm:pg-connection-string@^2.7.0": "2.7.0"
},
"jsr": {
"@badrap/valita@0.4.2": {
"integrity": "af8a829e82eac71adbc7b60352798f94dcc66d19fab16b657957ca9e646c25fd"
},
"@std/assert@1.0.10": {
"integrity": "59b5cbac5bd55459a19045d95cc7c2ff787b4f8527c0dd195078ff6f9481fbb3",
"dependencies": [
"jsr:@std/internal"
]
},
"@std/bytes@1.0.4": {
"integrity": "11a0debe522707c95c7b7ef89b478c13fb1583a7cfb9a85674cd2cc2e3a28abc"
},
"@std/encoding@1.0.6": {
"integrity": "ca87122c196e8831737d9547acf001766618e78cd8c33920776c7f5885546069"
},
"@std/expect@1.0.11": {
"integrity": "5aa5d5cf891e9a3249e45ea770de15189e5a2faee2122ee5746b10d1c310a19b",
"dependencies": [
"jsr:@std/assert",
"jsr:@std/internal"
]
},
"@std/internal@1.0.5": {
"integrity": "54a546004f769c1ac9e025abd15a76b6671ddc9687e2313b67376125650dc7ba"
},
"@std/path@1.0.8": {
"integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be"
},
"@std/streams@1.0.8": {
"integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3",
"dependencies": [
"jsr:@std/bytes@^1.0.3"
]
}
},
"npm": {
@@ -433,11 +460,11 @@
"https://deno.land/x/postgresjs@v3.4.5/src/result.js": "001ff5e0c8d634674f483d07fbcd620a797e3101f842d6c20ca3ace936260465",
"https://deno.land/x/postgresjs@v3.4.5/src/subscribe.js": "9e4d0c3e573a6048e77ee2f15abbd5bcd17da9ca85a78c914553472c6d6c169b",
"https://deno.land/x/postgresjs@v3.4.5/src/types.js": "471f4a6c35412aa202a7c177c0a7e5a7c3bd225f01bbde67c947894c1b8bf6ed",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13"
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6",
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438",
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53",
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13"
}
}

View File

@@ -1 +1 @@
export * from "https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/mod.ts";
export * from "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts";

21
mod.ts
View File

@@ -8,8 +8,8 @@ import {
union,
unknown,
} from "./valita.ts";
import { Pool, wire_connect, type LogLevel } from "./wire.ts";
import { sql_types, type SqlType, type SqlTypeMap } from "./query.ts";
import { Pool, wire_connect } from "./wire.ts";
import { sql_types, type SqlTypeMap } from "./query.ts";
export {
WireError,
@@ -27,11 +27,10 @@ export {
sql,
is_sql,
Query,
type Row,
type CommandResult,
type Result,
type Results,
type ResultStream,
type Row,
type Rows,
type RowStream,
} from "./query.ts";
export type Options = {
@@ -61,6 +60,7 @@ const ParsedOptions = object({
runtime_params: record(string()).optional(() => ({})),
max_connections: number().optional(() => 10),
idle_timeout: number().optional(() => 20),
reconnect_delay: number().optional(() => 5),
types: record(unknown())
.optional(() => ({}))
.map((types): SqlTypeMap => ({ ...sql_types, ...types })),
@@ -101,10 +101,6 @@ export function connect(s: string, options: Options = {}) {
postgres.connect = connect;
export type PostgresEvents = {
log(level: LogLevel, ctx: object, msg: string): void;
};
export class Postgres extends Pool {
readonly #options;
@@ -113,8 +109,9 @@ export class Postgres extends Pool {
this.#options = options;
}
async connect() {
const wire = await wire_connect(this.#options);
async connect(options: Options = {}) {
const opts = ParsedOptions.parse({ ...this.#options, ...options });
const wire = await wire_connect(opts);
return wire.on("log", (l, c, s) => this.emit("log", l, c, s));
}
}

View File

@@ -1,23 +0,0 @@
import postgres from "./mod.ts";
await using pool = postgres(`postgres://test:test@localhost:5432/test`, {
runtime_params: { client_min_messages: "INFO" },
});
pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx));
await pool.begin(async (pg, tx) => {
await pg.query`
create table my_test (
key integer primary key generated always as identity,
data text not null
)
`;
await pg.query`
insert into my_test (data) values (${[1, 2, 3]}::bytea)
`;
console.log(await pg.query`select * from my_test`);
await tx.rollback();
});

View File

@@ -127,10 +127,36 @@ export const bool: SqlType = {
return s !== "f";
},
output(x) {
return typeof x === "undefined" || x === null ? null : x ? "t" : "f";
if (typeof x === "undefined" || x === null) return null;
const b = bool_names[String(x).toLowerCase()];
if (typeof b === "boolean") return b ? "t" : "f";
else throw new SqlTypeError(`invalid bool output '${x}'`);
},
};
const bool_names: Partial<Record<string, boolean>> = {
// https://www.postgresql.org/docs/current/datatype-boolean.html#DATATYPE-BOOLEAN
t: true,
tr: true,
tru: true,
true: true,
y: true,
ye: true,
yes: true,
on: true,
1: true,
f: false,
fa: false,
fal: false,
fals: false,
false: false,
n: false,
no: false,
of: false,
off: false,
0: false,
};
export const text: SqlType = {
input(s) {
return s;
@@ -297,42 +323,39 @@ export const sql_types: SqlTypeMap = {
sql.types = sql_types;
type ReadonlyTuple<T extends readonly unknown[]> = readonly [...T];
export interface CommandResult {
export interface Result {
readonly tag: string;
}
export interface Result<T> extends CommandResult, ReadonlyTuple<[T]> {
readonly row: T;
}
export interface Results<T> extends CommandResult, ReadonlyArray<T> {
export interface Rows<T> extends Result, ReadonlyArray<T> {
readonly rows: ReadonlyArray<T>;
}
export interface ResultStream<T>
extends AsyncIterable<T[], CommandResult, void> {}
export interface RowStream<T> extends AsyncIterable<T[], Result, void> {}
export interface Row extends Iterable<unknown, void, void> {
[column: string]: unknown;
}
export interface QueryOptions {
readonly simple: boolean;
readonly chunk_size: number;
readonly stdin: ReadableStream<Uint8Array> | null;
readonly stdout: WritableStream<Uint8Array> | null;
}
export class Query<T = Row>
implements PromiseLike<Results<T>>, ResultStream<T>
{
export class Query<T = Row> implements PromiseLike<Rows<T>>, RowStream<T> {
readonly #f;
constructor(f: (options: Partial<QueryOptions>) => ResultStream<T>) {
constructor(f: (options: Partial<QueryOptions>) => RowStream<T>) {
this.#f = f;
}
simple(simple = true) {
const f = this.#f;
return new Query((o) => f({ simple, ...o }));
}
chunked(chunk_size = 1) {
const f = this.#f;
return new Query((o) => f({ chunk_size, ...o }));
@@ -399,26 +422,25 @@ export class Query<T = Row>
return this.#f(options);
}
async first(): Promise<Result<T>> {
const { rows, tag } = await this.collect(1);
if (!rows.length) throw new Error(`expected one row, got none instead`);
const row = rows[0];
return Object.assign([row] as const, { row: rows[0], tag });
async first(): Promise<T> {
const rows = await this.collect(1);
if (rows.length !== 0) return rows[0];
else throw new TypeError(`expected one row, got none instead`);
}
async first_or<S>(value: S): Promise<Result<T | S>> {
const { rows, tag } = await this.collect(1);
const row = rows.length ? rows[0] : value;
return Object.assign([row] as const, { row: rows[0], tag });
async first_or<S>(value: S): Promise<T | S> {
const rows = await this.collect(1);
return rows.length !== 0 ? rows[0] : value;
}
async collect(count = Number.POSITIVE_INFINITY): Promise<Results<T>> {
async collect(count = Number.POSITIVE_INFINITY): Promise<Rows<T>> {
const iter = this[Symbol.asyncIterator]();
let next;
const rows = [];
for (let i = 0; !(next = await iter.next()).done; ) {
const { value: c } = next;
for (let j = 0, n = c.length; i < count && j < n; ) rows[i++] = c[j++];
const chunk = next.value;
for (let j = 0, n = chunk.length; i < count && j < n; )
rows[i++] = chunk[j++];
}
return Object.assign(rows, next.value, { rows });
}
@@ -437,8 +459,8 @@ export class Query<T = Row>
return n;
}
then<S = Results<T>, U = never>(
f?: ((rows: Results<T>) => S | PromiseLike<S>) | null,
then<S = Rows<T>, U = never>(
f?: ((rows: Rows<T>) => S | PromiseLike<S>) | null,
g?: ((reason?: unknown) => U | PromiseLike<U>) | null
) {
return this.collect().then(f, g);
@@ -453,7 +475,8 @@ function str_to_stream(s: string) {
return new ReadableStream({
type: "bytes",
start(c) {
c.enqueue(to_utf8(s)), c.close();
if (s.length !== 0) c.enqueue(to_utf8(s));
c.close();
},
});
}

231
test.ts Normal file
View File

@@ -0,0 +1,231 @@
import pglue, { PostgresError, SqlTypeError } from "./mod.ts";
import { expect } from "jsr:@std/expect";
import { toText } from "jsr:@std/streams";
async function connect(params?: Record<string, string>) {
const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, {
runtime_params: { client_min_messages: "INFO", ...params },
});
return pg.on("log", (_level, ctx, msg) => {
console.info(`${msg}`, ctx);
});
}
Deno.test(`integers`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const { a, b, c } = await pg.query`
select
${"0x100"}::int2 as a,
${777}::int4 as b,
${{
[Symbol.toPrimitive](hint: string) {
expect(hint).toBe("number");
return "1234";
},
}}::int8 as c
`.first();
expect(a).toBe(0x100);
expect(b).toBe(777);
expect(c).toBe(1234);
const { large } =
await pg.query`select ${"10000000000000000"}::int8 as large`.first();
expect(large).toBe(10000000000000000n);
await expect(pg.query`select ${100000}::int2`).rejects.toThrow(SqlTypeError);
await expect(pg.query`select ${"100000"}::text::int2`).rejects.toThrow(
PostgresError
);
});
Deno.test(`boolean`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const { a, b, c } = await pg.query`
select
${true}::bool as a,
${"n"}::bool as b,
${undefined}::bool as c
`.first();
expect(a).toBe(true);
expect(b).toBe(false);
expect(c).toBe(null);
});
Deno.test(`bytea`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const { string, array, buffer } = await pg.query`
select
${"hello, world"}::bytea as string,
${[1, 2, 3, 4, 5]}::bytea as array,
${Uint8Array.of(5, 4, 3, 2, 1)}::bytea as buffer
`.first();
expect(string).toEqual(new TextEncoder().encode("hello, world"));
expect(array).toEqual(Uint8Array.of(1, 2, 3, 4, 5));
expect(buffer).toEqual(Uint8Array.of(5, 4, 3, 2, 1));
});
Deno.test(`row`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
expect(
(
await pg.query`create table my_table (a text not null, b text not null, c text not null)`
).tag
).toBe(`CREATE TABLE`);
expect(
(
await pg.query`copy my_table from stdin`.stdin(
`field a\tfield b\tfield c`
)
).tag
).toBe(`COPY 1`);
const row = await pg.query`select * from my_table`.first();
{
// columns by name
const { a, b, c } = row;
expect(a).toBe("field a");
expect(b).toBe("field b");
expect(c).toBe("field c");
}
{
// columns by index
const [a, b, c] = row;
expect(a).toBe("field a");
expect(b).toBe("field b");
expect(c).toBe("field c");
}
const { readable, writable } = new TransformStream<Uint8Array>(
{},
new ByteLengthQueuingStrategy({ highWaterMark: 4096 }),
new ByteLengthQueuingStrategy({ highWaterMark: 4096 })
);
await pg.query`copy my_table to stdout`.stdout(writable);
expect(await toText(readable)).toBe(`field a\tfield b\tfield c\n`);
});
Deno.test(`sql injection`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const input = `injection'); drop table users; --`;
expect((await pg.query`create table users (name text not null)`).tag).toBe(
`CREATE TABLE`
);
expect((await pg.query`insert into users (name) values (${input})`).tag).toBe(
`INSERT 0 1`
);
const { name } = await pg.query<{ name: string }>`
select name from users
`.first();
expect(name).toBe(input);
});
Deno.test(`listen/notify`, async () => {
await using pg = await connect();
const sent: string[] = [];
await using ch = await pg.listen(`my channel`, (payload) => {
expect(payload).toBe(sent.shift());
});
for (let i = 0; i < 5; i++) {
const payload = `test payload ${i}`;
sent.push(payload);
await ch.notify(payload);
}
expect(sent.length).toBe(0);
});
Deno.test(`transactions`, async () => {
await using pg = await connect();
await pg.begin(async (pg) => {
await pg.begin(async (pg, tx) => {
await pg.query`create table my_table (field text not null)`;
await tx.rollback();
});
await expect(pg.query`select * from my_table`).rejects.toThrow(
PostgresError
);
});
await expect(pg.query`select * from my_table`).rejects.toThrow(PostgresError);
await pg.begin(async (pg) => {
await pg.begin(async (pg, tx) => {
await pg.begin(async (pg, tx) => {
await pg.begin(async (pg) => {
await pg.query`create table my_table (field text not null)`;
});
await tx.commit();
});
expect(await pg.query`select * from my_table`.count()).toBe(0);
await tx.rollback();
});
await expect(pg.query`select * from my_table`).rejects.toThrow(
PostgresError
);
});
});
Deno.test(`streaming`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
await pg.query`create table my_table (field text not null)`;
for (let i = 0; i < 20; i++) {
await pg.query`insert into my_table (field) values (${i})`;
}
let i = 0;
for await (const chunk of pg.query`select * from my_table`.chunked(5)) {
expect(chunk.length).toBe(5);
for (const row of chunk) expect(row.field).toBe(`${i++}`);
}
expect(i).toBe(20);
});
Deno.test(`simple`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const rows = await pg.query`
create table my_table (field text not null);
insert into my_table (field) values ('one'), ('two'), ('three');
select * from my_table;
select * from my_table where field = 'two';
`.simple();
expect(rows.length).toBe(4);
const [{ field: a }, { field: b }, { field: c }, { field: d }] = rows;
expect(a).toBe("one");
expect(b).toBe("two");
expect(c).toBe("three");
expect(d).toBe("two");
});

574
wire.ts
View File

@@ -1,5 +1,6 @@
import {
type BinaryLike,
buf_concat,
buf_concat_fast,
buf_eq,
buf_xor,
@@ -7,8 +8,9 @@ import {
from_base64,
from_utf8,
jit,
type Receiver,
semaphore,
semaphore_fast,
type Sender,
to_base64,
to_utf8,
TypedEmitter,
@@ -33,10 +35,11 @@ import {
type EncoderType,
} from "./ser.ts";
import {
type CommandResult,
format,
is_sql,
Query,
type ResultStream,
type Result,
type RowStream,
type Row,
sql,
type SqlFragment,
@@ -88,7 +91,7 @@ export class PostgresError extends WireError {
}
}
function severity_level(s: string): LogLevel {
function severity_log_level(s: string): LogLevel {
switch (s) {
case "DEBUG":
return "debug";
@@ -445,91 +448,76 @@ export interface WireOptions {
readonly password: string;
readonly database: string | null;
readonly runtime_params: Record<string, string>;
readonly reconnect_delay: number;
readonly types: SqlTypeMap;
}
export type WireEvents = {
log(level: LogLevel, ctx: object, msg: string): void;
notice(notice: PostgresError): void;
parameter(name: string, value: string, prev: string | null): void;
notify(channel: string, payload: string, process_id: number): void;
parameter(name: string, value: string, prev: string | null): void;
close(reason?: unknown): void;
};
export interface Transaction extends CommandResult, AsyncDisposable {
export interface Transaction extends Result, AsyncDisposable {
readonly open: boolean;
commit(): Promise<CommandResult>;
rollback(): Promise<CommandResult>;
commit(): Promise<Result>;
rollback(): Promise<Result>;
}
export type ChannelEvents = { notify: NotificationHandler };
export type NotificationHandler = (payload: string, process_id: number) => void;
export interface Channel
extends TypedEmitter<ChannelEvents>,
CommandResult,
Result,
AsyncDisposable {
readonly name: string;
readonly open: boolean;
notify(payload: string): Promise<CommandResult>;
unlisten(): Promise<CommandResult>;
notify(payload: string): Promise<Result>;
unlisten(): Promise<Result>;
}
export async function wire_connect(options: WireOptions) {
const { host, port } = options;
const wire = new Wire(await socket_connect(host, port), options);
return await wire.connected, wire;
const wire = new Wire(options);
return await wire.connect(), wire;
}
async function socket_connect(hostname: string, port: number) {
if (hostname.startsWith("/")) {
const path = join(hostname, `.s.PGSQL.${port}`);
return await Deno.connect({ transport: "unix", path });
} else {
const socket = await Deno.connect({ transport: "tcp", hostname, port });
return socket.setNoDelay(), socket.setKeepAlive(), socket;
}
}
export class Wire extends TypedEmitter<WireEvents> implements Disposable {
readonly #socket;
export class Wire<V extends WireEvents = WireEvents>
extends TypedEmitter<V>
implements Disposable
{
readonly #params;
readonly #auth;
readonly #connected;
readonly #connect;
readonly #query;
readonly #begin;
readonly #listen;
readonly #notify;
readonly #close;
get socket() {
return this.#socket;
}
get params() {
return this.#params;
}
get connected() {
return this.#connected;
}
constructor(socket: Deno.Conn, options: WireOptions) {
constructor(options: WireOptions) {
super();
({
params: this.#params,
auth: this.#auth,
connect: this.#connect,
query: this.#query,
begin: this.#begin,
listen: this.#listen,
notify: this.#notify,
close: this.#close,
} = wire_impl(this, socket, options));
this.#socket = socket;
(this.#connected = this.#auth()).catch(close);
} = wire_impl(this, options));
}
query(sql: SqlFragment): Query;
query(s: TemplateStringsArray, ...xs: unknown[]): Query;
connect() {
return this.#connect();
}
query<T = Row>(sql: SqlFragment): Query<T>;
query<T = Row>(s: TemplateStringsArray, ...xs: unknown[]): Query<T>;
query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) {
return this.#query(is_sql(s) ? s : sql(s, ...xs));
}
@@ -557,17 +545,16 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
return this.#notify(channel, payload);
}
async get(param: string, missing_null = true) {
return (
await this.query`select current_setting(${param}, ${missing_null})`
.map(([s]) => String(s))
.first_or(null)
)[0];
async get(param: string) {
return await this.query`select current_setting(${param}, true)`
.map(([s]) => String(s))
.first_or(null);
}
async set(param: string, value: string, local = false) {
return await this
.query`select set_config(${param}, ${value}, ${local})`.execute();
return await this.query`select set_config(${param}, ${value}, ${local})`
.map(([s]) => String(s))
.first();
}
close(reason?: unknown) {
@@ -579,105 +566,188 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
}
}
async function socket_connect(hostname: string, port: number) {
if (hostname.startsWith("/")) {
const path = join(hostname, `.s.PGSQL.${port}`);
return await Deno.connect({ transport: "unix", path });
} else {
const socket = await Deno.connect({ transport: "tcp", hostname, port });
return socket.setNoDelay(), socket.setKeepAlive(), socket;
}
}
function wire_impl(
wire: Wire,
socket: Deno.Conn,
{ user, database, password, runtime_params, types }: WireOptions
{
host,
port,
user,
database,
password,
runtime_params,
reconnect_delay,
types,
}: WireOptions
) {
// current runtime parameters as reported by postgres
const params: Parameters = Object.create(null);
function log(level: LogLevel, ctx: object, msg: string) {
wire.emit("log", level, ctx, msg);
}
// wire supports re-connection; socket and read/write channels are null when closed
let connected = false;
let should_reconnect = false;
let socket: Deno.Conn | null = null;
let read_pop: Receiver<Uint8Array> | null = null;
let write_push: Sender<Uint8Array> | null = null;
async function read<T>(type: Encoder<T>) {
const msg = await read_recv();
if (msg === null) throw new WireError(`connection closed`);
else return ser_decode(type, msg_check_err(msg));
const msg = read_pop !== null ? await read_pop() : null;
if (msg !== null) return ser_decode(type, msg_check_err(msg));
else throw new WireError(`connection closed`);
}
async function read_raw() {
const msg = await read_recv();
if (msg === null) throw new WireError(`connection closed`);
else return msg;
async function read_msg() {
const msg = read_pop !== null ? await read_pop() : null;
if (msg !== null) return msg;
else throw new WireError(`connection closed`);
}
async function* read_socket() {
const buf = new Uint8Array(64 * 1024);
for (let n; (n = await socket.read(buf)) !== null; )
yield buf.subarray(0, n);
}
const read_recv = channel.receiver<Uint8Array>(async function read(send) {
async function read_socket(socket: Deno.Conn, push: Sender<Uint8Array>) {
let err;
try {
let buf = new Uint8Array();
for await (const chunk of read_socket()) {
buf = buf_concat_fast(buf, chunk);
const header_size = 5;
const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads
let buf = new Uint8Array(); // concatenated messages read so far
for (let n; (n = ser_decode(Header, buf).length + 1) <= buf.length; ) {
const msg = buf.subarray(0, n);
buf = buf.subarray(n);
switch (msg_type(msg)) {
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-ASYNC
case NoticeResponse.type: {
const { fields } = ser_decode(NoticeResponse, msg);
const notice = new PostgresError(fields);
log(severity_level(notice.severity), notice, notice.message);
wire.emit("notice", notice);
continue;
}
case ParameterStatus.type: {
const { name, value } = ser_decode(ParameterStatus, msg);
const prev = params[name] ?? null;
Object.defineProperty(params, name, {
configurable: true,
enumerable: true,
value,
});
wire.emit("parameter", name, value, prev);
continue;
}
case NotificationResponse.type: {
const { channel, payload, process_id } = ser_decode(
NotificationResponse,
msg
);
wire.emit("notify", channel, payload, process_id);
channels.get(channel)?.emit("notify", payload, process_id);
continue;
}
}
send(msg);
for (let read; (read = await socket.read(read_buf)) !== null; ) {
buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf
while (buf.length >= header_size) {
const size = ser_decode(Header, buf).length + 1;
if (buf.length < size) break;
const msg = buf.subarray(0, size); // shift one message from buf
buf = buf.subarray(size);
if (!handle_msg(msg)) push(msg);
}
}
// there should be nothing left in buf if we gracefully exited
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
wire.emit("close");
} catch (e) {
wire.emit("close", e);
throw (err = e);
} finally {
onclose(err);
}
});
function write<T>(type: Encoder<T>, value: T) {
return write_raw(ser_encode(type, value));
}
async function write_raw(buf: Uint8Array) {
for (let i = 0, n = buf.length; i < n; )
i += await socket.write(buf.subarray(i));
function handle_msg(msg: Uint8Array) {
switch (msg_type(msg)) {
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-ASYNC
case NoticeResponse.type: {
const { fields } = ser_decode(NoticeResponse, msg);
const notice = new PostgresError(fields);
log(severity_log_level(notice.severity), notice, notice.message);
wire.emit("notice", notice);
return true;
}
case NotificationResponse.type: {
const { channel, payload, process_id } = ser_decode(
NotificationResponse,
msg
);
wire.emit("notify", channel, payload, process_id);
channels.get(channel)?.emit("notify", payload, process_id);
return true;
}
case ParameterStatus.type: {
const { name, value } = ser_decode(ParameterStatus, msg);
const prev = params[name] ?? null;
Object.defineProperty(params, name, {
configurable: true,
enumerable: true,
value,
});
wire.emit("parameter", name, value, prev);
return true;
}
}
return false;
}
function write<T>(type: Encoder<T>, value: T) {
write_msg(ser_encode(type, value));
}
function write_msg(buf: Uint8Array) {
if (write_push !== null) write_push(buf);
else throw new WireError(`connection closed`);
}
async function write_socket(socket: Deno.Conn, pop: Receiver<Uint8Array>) {
let err;
try {
for (let buf; (buf = await pop()) !== null; ) {
const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any
for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf;
if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls
for (let i = 0, n = buf.length; i < n; )
i += await socket.write(buf.subarray(i));
}
} catch (e) {
throw (err = e);
} finally {
onclose(err);
}
}
async function connect() {
using _rlock = await rlock();
using _wlock = await wlock();
if (connected) return;
try {
const s = (socket = await socket_connect(host, port));
read_pop = channel.receiver((push) => read_socket(s, push));
write_push = channel.sender((pop) => write_socket(s, pop));
await handle_auth(); // run auth with rw lock
(connected = true), (should_reconnect = reconnect_delay !== 0);
} catch (e) {
throw (close(e), e);
}
}
function reconnect() {
connect().catch((err) => {
log("warn", err as Error, `reconnect failed`);
setTimeout(reconnect, reconnect_delay);
});
}
function close(reason?: unknown) {
socket.close(), read_recv.close(reason);
(should_reconnect = false), onclose(reason);
}
function onclose(reason?: unknown) {
if (!connected) return;
else connected = false;
socket?.close(), (socket = null);
read_pop?.close(reason), (read_pop = null);
write_push?.close(reason), (write_push = null);
for (const name of Object.keys(params))
delete (params as Record<string, string>)[name];
st_cache.clear(), (st_ids = 0);
(tx_status = "I"), (tx_stack.length = 0);
should_reconnect &&= (setTimeout(reconnect, reconnect_delay), false);
wire.emit("close", reason);
}
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING
const rlock = semaphore_fast();
const wlock = semaphore_fast();
const rlock = semaphore();
const wlock = semaphore();
function pipeline<T>(
w: () => void | PromiseLike<void>,
@@ -689,31 +759,38 @@ function wire_impl(
});
}
function pipeline_read<T>(r: () => T | PromiseLike<T>) {
return rlock(async () => {
async function pipeline_read<T>(r: () => T | PromiseLike<T>) {
using _lock = await rlock();
try {
return await r();
} finally {
try {
return await r();
} finally {
let msg;
while (msg_type((msg = await read_raw())) !== ReadyForQuery.type);
while (msg_type((msg = await read_msg())) !== ReadyForQuery.type);
({ tx_status } = ser_decode(ReadyForQuery, msg));
} catch {
// ignored
}
});
}
}
function pipeline_write<T>(w: () => T | PromiseLike<T>) {
return wlock(async () => {
async function pipeline_write<T>(w: () => T | PromiseLike<T>) {
using _lock = await wlock();
try {
return await w();
} finally {
try {
return await w();
} finally {
await write(Sync, {});
write(Sync, {});
} catch {
// ignored
}
});
}
}
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-START-UP
async function auth() {
await write(StartupMessage, {
async function handle_auth() {
// always run within rw lock (see connect())
write(StartupMessage, {
version: 196608,
params: {
application_name: "pglue",
@@ -728,7 +805,7 @@ function wire_impl(
});
auth: for (;;) {
const msg = msg_check_err(await read_raw());
const msg = msg_check_err(await read_msg());
switch (msg_type(msg)) {
case NegotiateProtocolVersion.type: {
const { bad_options } = ser_decode(NegotiateProtocolVersion, msg);
@@ -746,7 +823,7 @@ function wire_impl(
throw new WireError(`kerberos authentication is deprecated`);
case 3: // AuthenticationCleartextPassword
await write(PasswordMessage, { password });
write(PasswordMessage, { password });
continue;
case 5: // AuthenticationMD5Password
@@ -762,7 +839,7 @@ function wire_impl(
// AuthenticationSASL
case 10:
await auth_sasl();
await handle_auth_sasl();
continue;
default:
@@ -770,8 +847,9 @@ function wire_impl(
}
}
// wait for ready
ready: for (;;) {
const msg = msg_check_err(await read_raw());
const msg = msg_check_err(await read_msg());
switch (msg_type(msg)) {
case BackendKeyData.type:
continue; // ignored
@@ -781,11 +859,18 @@ function wire_impl(
break ready;
}
}
// re-listen previously registered channels
await Promise.all(
channels
.keys()
.map((name) => query(sql`listen ${sql.ident(name)}`).execute())
);
}
// https://www.postgresql.org/docs/current/sasl-authentication.html#SASL-SCRAM-SHA-256
// https://datatracker.ietf.org/doc/html/rfc5802
async function auth_sasl() {
async function handle_auth_sasl() {
const bits = 256;
const hash = `SHA-${bits}`;
const mechanism = `SCRAM-${hash}`;
@@ -842,7 +927,7 @@ function wire_impl(
)}`;
const client_first_message_bare = `${username},${initial_nonce}`;
const client_first_message = `${gs2_header}${client_first_message_bare}`;
await write(SASLInitialResponse, { mechanism, data: client_first_message });
write(SASLInitialResponse, { mechanism, data: client_first_message });
const server_first_message_str = from_utf8(
(await read(AuthenticationSASLContinue)).data
@@ -861,7 +946,7 @@ function wire_impl(
const client_proof = buf_xor(client_key, client_signature);
const proof = `p=${to_base64(client_proof)}`;
const client_final_message = `${client_final_message_without_proof},${proof}`;
await write(SASLResponse, { data: client_final_message });
write(SASLResponse, { data: client_final_message });
const server_key = await hmac(salted_password, "Server Key");
const server_signature = await hmac(server_key, auth_message);
@@ -881,47 +966,44 @@ function wire_impl(
readonly name = `__st${st_ids++}`;
constructor(readonly query: string) {}
parse_task: Promise<{
#parse_task: Promise<{
ser_params: ParameterSerializer;
Row: RowConstructor;
}> | null = null;
parse() {
return (this.parse_task ??= this.#parse());
return (this.#parse_task ??= this.#parse());
}
async #parse() {
try {
const { name, query } = this;
return await pipeline(
async () => {
await write(Parse, { statement: name, query, param_types: [] });
await write(Describe, { which: "S", name });
() => {
write(Parse, { statement: name, query, param_types: [] });
write(Describe, { which: "S", name });
},
async () => {
await read(ParseComplete);
const param_desc = await read(ParameterDescription);
const ser_params = make_param_ser(await read(ParameterDescription));
const msg = msg_check_err(await read_raw());
const row_desc =
const msg = msg_check_err(await read_msg());
const Row =
msg_type(msg) === NoData.type
? { columns: [] }
: ser_decode(RowDescription, msg);
? EmptyRow
: make_row_ctor(ser_decode(RowDescription, msg));
return {
ser_params: param_ser(param_desc),
Row: row_ctor(row_desc),
};
return { ser_params, Row };
}
);
} catch (e) {
throw ((this.parse_task = null), e);
throw ((this.#parse_task = null), e);
}
}
portals = 0;
#portals = 0;
portal() {
return `${this.name}_${this.portals++}`;
return `${this.name}_${this.#portals++}`;
}
}
@@ -930,7 +1012,8 @@ function wire_impl(
(params: unknown[]): (string | null)[];
}
function param_ser({ param_types }: ParameterDescription) {
// makes function to serialize query parameters
function make_param_ser({ param_types }: ParameterDescription) {
return jit.compiled<ParameterSerializer>`function ser_params(xs) {
return [
${jit.map(", ", param_types, (type_oid, i) => {
@@ -946,7 +1029,9 @@ function wire_impl(
new (columns: (BinaryLike | null)[]): Row;
}
function row_ctor({ columns }: RowDescription) {
// makes function to create Row objects
const EmptyRow = make_row_ctor({ columns: [] });
function make_row_ctor({ columns }: RowDescription) {
const Row = jit.compiled<RowConstructor>`function Row(xs) {
${jit.map(" ", columns, ({ name, type_oid }, i) => {
const type = types[type_oid] ?? text;
@@ -983,7 +1068,7 @@ function wire_impl(
stdout: WritableStream<Uint8Array> | null
) {
for (let rows = [], i = 0; ; ) {
const msg = msg_check_err(await read_raw());
const msg = msg_check_err(await read_msg());
switch (msg_type(msg)) {
default:
case DataRow.type:
@@ -1001,11 +1086,15 @@ function wire_impl(
case EmptyQueryResponse.type:
return { done: true as const, rows, tag: "" };
case RowDescription.type:
Row = make_row_ctor(ser_decode(RowDescription, msg));
continue;
case CopyInResponse.type:
continue;
case CopyOutResponse.type:
await read_copy_out(stdout);
await read_copy_out(stdout), (stdout = null);
continue;
}
}
@@ -1015,52 +1104,92 @@ function wire_impl(
if (stream !== null) {
const writer = stream.getWriter();
try {
for (let msg; msg_type((msg = await read_raw())) !== CopyDone.type; ) {
for (let msg; msg_type((msg = await read_msg())) !== CopyDone.type; ) {
const { data } = ser_decode(CopyData, msg_check_err(msg));
await writer.write(to_utf8(data));
}
await writer.close();
} catch (e) {
await writer.abort(e);
throw e;
} finally {
writer.releaseLock();
}
} else {
while (msg_type(msg_check_err(await read_raw())) !== CopyDone.type);
while (msg_type(msg_check_err(await read_msg())) !== CopyDone.type);
}
}
async function write_copy_in(stream: ReadableStream<Uint8Array> | null) {
if (stream !== null) {
const reader = stream.getReader();
let err;
try {
try {
for (let next; !(next = await reader.read()).done; )
await write(CopyData, { data: next.value });
} catch (e) {
err = e;
} finally {
if (typeof err === "undefined") await write(CopyDone, {});
else await write(CopyFail, { cause: String(err) });
}
for (let next; !(next = await reader.read()).done; )
write(CopyData, { data: next.value });
write(CopyDone, {});
} catch (e) {
write(CopyFail, { cause: String(e) });
throw e;
} finally {
reader.releaseLock();
}
} else {
await write(CopyDone, {});
write(CopyDone, {});
}
}
async function* execute_simple(
query: string,
stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null
): RowStream<Row> {
yield* await pipeline(
() => {
log("debug", { query }, `executing simple query`);
write(QueryMessage, { query });
write_copy_in(stdin);
},
async () => {
for (let chunks = [], err; ; ) {
const msg = await read_msg();
switch (msg_type(msg)) {
default:
case ReadyForQuery.type:
if (err) throw err;
else return chunks;
case RowDescription.type: {
const Row = make_row_ctor(ser_decode(RowDescription, msg));
const { rows } = await read_rows(Row, stdout);
chunks.push(rows);
stdout = null;
continue;
}
case EmptyQueryResponse.type:
case CommandComplete.type:
continue;
case ErrorResponse.type: {
const { fields } = ser_decode(ErrorResponse, msg);
err = new PostgresError(fields);
continue;
}
}
}
}
);
return { tag: "" };
}
async function* execute_fast(
st: Statement,
params: unknown[],
stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> {
log(
"debug",
{ query: st.query, statement: st.name, params },
`executing query`
);
): RowStream<Row> {
const { query, name: statement } = st;
const { ser_params, Row } = await st.parse();
const param_values = ser_params(params);
const portal = st.portal();
@@ -1068,16 +1197,17 @@ function wire_impl(
try {
const { rows, tag } = await pipeline(
async () => {
await write(Bind, {
log("debug", { query, statement, params }, `executing query`);
write(Bind, {
portal,
statement: st.name,
param_formats: [],
param_values,
column_formats: [],
});
await write(Execute, { portal, row_limit: 0 });
write(Execute, { portal, row_limit: 0 });
await write_copy_in(stdin);
await write(Close, { which: "P" as const, name: portal });
write(Close, { which: "P", name: portal });
},
async () => {
await read(BindComplete);
@@ -1088,10 +1218,14 @@ function wire_impl(
if (rows.length) yield rows;
return { tag };
} catch (e) {
await pipeline(
() => write(Close, { which: "P" as const, name: portal }),
() => read(CloseComplete)
);
try {
await pipeline(
() => write(Close, { which: "P", name: portal }),
() => read(CloseComplete)
);
} catch {
// ignored
}
throw e;
}
@@ -1103,29 +1237,25 @@ function wire_impl(
chunk_size: number,
stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> {
log(
"debug",
{ query: st.query, statement: st.name, params },
`executing chunked query`
);
): RowStream<Row> {
const { query, name: statement } = st;
const { ser_params, Row } = await st.parse();
const param_values = ser_params(params);
const portal = st.portal();
try {
let { done, rows, tag } = await pipeline(
async () => {
await write(Bind, {
() => {
log("debug", { query, statement, params }, `executing chunked query`);
write(Bind, {
portal,
statement: st.name,
param_formats: [],
param_values,
column_formats: [],
});
await write(Execute, { portal, row_limit: chunk_size });
await write_copy_in(stdin);
write(Execute, { portal, row_limit: chunk_size });
return write_copy_in(stdin);
},
async () => {
await read(BindComplete);
@@ -1147,21 +1277,26 @@ function wire_impl(
return { tag };
} finally {
await pipeline(
() => write(Close, { which: "P" as const, name: portal }),
() => write(Close, { which: "P", name: portal }),
() => read(CloseComplete)
);
}
}
function query(s: SqlFragment) {
const { query, params } = sql.format(s);
let st = st_cache.get(query);
if (!st) st_cache.set(query, (st = new Statement(query)));
function query(sql: SqlFragment) {
return new Query(
({ simple = false, chunk_size = 0, stdin = null, stdout = null }) => {
const { query, params } = format(sql);
if (simple) {
if (!params.length) return execute_simple(query, stdin, stdout);
else throw new WireError(`simple query cannot be parameterised`);
}
return new Query(({ chunk_size = 0, stdin = null, stdout = null }) =>
chunk_size !== 0
? execute_chunked(st, params, chunk_size, stdin, stdout)
: execute_fast(st, params, stdin, stdout)
let st = st_cache.get(query);
if (!st) st_cache.set(query, (st = new Statement(query)));
if (!chunk_size) return execute_fast(st, params, stdin, stdout);
else return execute_chunked(st, params, chunk_size, stdin, stdout);
}
);
}
@@ -1190,7 +1325,7 @@ function wire_impl(
return tx_stack.indexOf(this) !== -1;
}
constructor(begin: CommandResult) {
constructor(begin: Result) {
Object.assign(this, begin);
}
@@ -1248,7 +1383,7 @@ function wire_impl(
return channels.get(this.#name) === this;
}
constructor(name: string, listen: CommandResult) {
constructor(name: string, listen: Result) {
super();
Object.assign(this, listen);
this.#name = name;
@@ -1270,7 +1405,7 @@ function wire_impl(
}
};
return { params, auth, query, begin, listen, notify, close };
return { params, connect, query, begin, listen, notify, close };
}
export interface PoolOptions extends WireOptions {
@@ -1282,7 +1417,7 @@ export type PoolEvents = {
log(level: LogLevel, ctx: object, msg: string): void;
};
export interface PoolWire extends Wire {
export interface PoolWire<V extends WireEvents = WireEvents> extends Wire<V> {
readonly connection_id: number;
readonly borrowed: boolean;
release(): void;
@@ -1292,8 +1427,8 @@ export interface PoolTransaction extends Transaction {
readonly wire: PoolWire;
}
export class Pool
extends TypedEmitter<PoolEvents>
export class Pool<V extends PoolEvents = PoolEvents>
extends TypedEmitter<V>
implements PromiseLike<PoolWire>, Disposable
{
readonly #acquire;
@@ -1320,8 +1455,8 @@ export class Pool
}
}
query(sql: SqlFragment): Query;
query(s: TemplateStringsArray, ...xs: unknown[]): Query;
query<T = Row>(sql: SqlFragment): Query<T>;
query<T = Row>(s: TemplateStringsArray, ...xs: unknown[]): Query<T>;
query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) {
s = is_sql(s) ? s : sql(s, ...xs);
const acquire = this.#acquire;
@@ -1429,9 +1564,8 @@ function pool_impl(
};
async function connect() {
const { host, port } = options;
const wire = new PoolWire(await socket_connect(host, port), options);
await wire.connected, all.add(wire);
const wire = new PoolWire(options);
await wire.connect(), all.add(wire);
const { connection_id } = wire;
return wire
.on("log", (l, c, s) => pool.emit("log", l, { ...c, connection_id }, s))