11 Commits

Author SHA1 Message Date
3793e14f50 Add v prefix to all version tags 2025-01-11 00:53:45 +11:00
b194397645 Optimise query request write 2025-01-11 00:46:17 +11:00
858b7a95f3 Add streaming query testing code 2025-01-11 00:39:00 +11:00
a88da00dec Update version tag in readme 2025-01-11 00:28:51 +11:00
cefe14b9dc Ensure stdout writable stream is always closed 2025-01-11 00:28:14 +11:00
4e68e34fd0 Add more testing code 2025-01-11 00:15:19 +11:00
826190ecc9 Add github import url 2025-01-10 20:35:32 +11:00
72749e5841 Update readme 2025-01-10 20:32:06 +11:00
b9829bc70d Update readme 2025-01-10 19:56:16 +11:00
9eecf29bc5 Update readme 2025-01-10 19:53:04 +11:00
8964cb342e Update benchmarks 2025-01-10 19:32:41 +11:00
10 changed files with 424 additions and 129 deletions

103
README.md
View File

@@ -1,16 +1,37 @@
# pglue # pglue
## Performance The glue for TypeScript to PostgreSQL.
pglue implements automatic query pipelining which makes it especially performant with many queries concurrently executed on a single connection. ## Overview
- 🌟 [High performance](#benchmarks), fully asynchronous, written in modern TypeScript
- 🐢 First class Deno support
- 💬 Automatic query parameterisation
- 🌧️ Automatic query pipelining
- 📣 Listen/notify support
- 📤 Connection pool support
## Installation
```ts
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.1.3/mod.ts";
// ...or from github:
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.1.3/mod.ts";
```
## Documentation
TODO: Write the documentation in more detail here.
## Benchmarks ## Benchmarks
Performance is generally on par with [postgres.js][1] and up to **4x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance. Performance is generally on par with [postgres-js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
Tested on a 4 core 2800 MHz x86_64-pc-linux-gnu QEMU VM with Deno 2.1.4 and local PostgreSQL 17.1 installation connected via TCP on localhost: Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost:
``` Query `select * from pg_type`:
```log
CPU | Common KVM Processor v2.0 CPU | Common KVM Processor v2.0
Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu) Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu)
@@ -18,74 +39,78 @@ benchmark time/iter (avg) iter/s (min … max) p75
--------------- ----------------------------- --------------------- -------------------------- --------------- ----------------------------- --------------------- --------------------------
group select n=1 group select n=1
pglue 9.9 ms 101.1 ( 7.9 ms … 17.8 ms) 10.2 ms 17.8 ms 17.8 ms pglue 8.3 ms 120.4 ( 7.2 ms … 14.4 ms) 8.5 ms 14.4 ms 14.4 ms
postgres.js 8.8 ms 114.2 ( 7.0 ms … 9.5 ms) 9.1 ms 9.5 ms 9.5 ms postgres-js 10.8 ms 92.3 ( 8.1 ms … 26.5 ms) 10.7 ms 26.5 ms 26.5 ms
deno-postgres 37.4 ms 26.7 ( 25.3 ms … 42.8 ms) 39.2 ms 42.8 ms 42.8 ms deno-postgres 37.1 ms 26.9 ( 33.4 ms … 41.3 ms) 38.5 ms 41.3 ms 41.3 ms
summary summary
pglue pglue
1.13x slower than postgres.js 1.30x faster than postgres-js
3.78x faster than deno-postgres 4.47x faster than deno-postgres
group select n=5 group select n=5
pglue 48.2 ms 20.8 ( 41.9 ms … 68.5 ms) 50.3 ms 68.5 ms 68.5 ms pglue 39.9 ms 25.1 ( 37.2 ms … 49.6 ms) 40.8 ms 49.6 ms 49.6 ms
postgres.js 43.6 ms 22.9 ( 38.1 ms … 57.3 ms) 48.6 ms 57.3 ms 57.3 ms postgres-js 42.4 ms 23.6 ( 36.5 ms … 61.8 ms) 44.2 ms 61.8 ms 61.8 ms
deno-postgres 186.5 ms 5.4 (138.4 ms … 213.2 ms) 193.6 ms 213.2 ms 213.2 ms deno-postgres 182.5 ms 5.5 (131.9 ms … 211.8 ms) 193.4 ms 211.8 ms 211.8 ms
summary summary
pglue pglue
1.11x slower than postgres.js 1.06x faster than postgres-js
3.87x faster than deno-postgres 4.57x faster than deno-postgres
group select n=10 group select n=10
pglue 97.8 ms 10.2 ( 90.2 ms … 105.0 ms) 104.0 ms 105.0 ms 105.0 ms pglue 78.9 ms 12.7 ( 72.3 ms … 88.9 ms) 82.5 ms 88.9 ms 88.9 ms
postgres.js 93.8 ms 10.7 ( 80.9 ms … 107.7 ms) 106.1 ms 107.7 ms 107.7 ms postgres-js 92.0 ms 10.9 ( 77.6 ms … 113.6 ms) 101.2 ms 113.6 ms 113.6 ms
deno-postgres 333.9 ms 3.0 (205.6 ms … 394.9 ms) 377.4 ms 394.9 ms 394.9 ms deno-postgres 326.6 ms 3.1 (208.8 ms … 406.0 ms) 388.8 ms 406.0 ms 406.0 ms
summary summary
pglue pglue
1.04x slower than postgres.js 1.17x faster than postgres-js
3.42x faster than deno-postgres 4.14x faster than deno-postgres
```
Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`:
```log
group insert n=1 group insert n=1
pglue 237.5 µs 4,210 (143.9 µs … 1.3 ms) 249.2 µs 953.3 µs 1.3 ms pglue 303.3 µs 3,297 (165.6 µs … 2.4 ms) 321.6 µs 1.1 ms 2.4 ms
postgres.js 242.5 µs 4,124 (137.4 µs … 886.4 µs) 263.4 µs 762.8 µs 865.5 µs postgres-js 260.4 µs 3,840 (132.9 µs … 2.7 ms) 276.4 µs 1.1 ms 2.7 ms
deno-postgres 295.1 µs 3,389 (163.8 µs … 899.3 µs) 340.0 µs 641.7 µs 899.3 µs deno-postgres 281.6 µs 3,552 (186.1 µs … 1.5 ms) 303.8 µs 613.6 µs 791.8 µs
summary summary
pglue pglue
1.02x faster than postgres.js 1.17x slower than postgres-js
1.24x faster than deno-postgres 1.08x slower than deno-postgres
group insert n=10 group insert n=10
pglue 1.1 ms 869.6 (610.1 µs … 2.1 ms) 1.2 ms 2.0 ms 2.1 ms pglue 1.1 ms 878.5 (605.5 µs … 3.2 ms) 1.1 ms 2.2 ms 3.2 ms
postgres.js 755.9 µs 1,323 (387.6 µs … 4.7 ms) 805.4 µs 2.8 ms 4.7 ms postgres-js 849.3 µs 1,177 (529.5 µs … 10.1 ms) 770.6 µs 3.0 ms 10.1 ms
deno-postgres 2.3 ms 434.4 ( 1.6 ms … 10.6 ms) 2.4 ms 6.5 ms 10.6 ms deno-postgres 2.3 ms 439.4 ( 1.4 ms … 4.9 ms) 2.5 ms 4.1 ms 4.9 ms
summary summary
pglue pglue
1.52x slower than postgres.js 1.34x slower than postgres-js
2.00x faster than deno-postgres 2.00x faster than deno-postgres
group insert n=100 group insert n=100
pglue 9.2 ms 109.0 ( 5.5 ms … 15.6 ms) 10.4 ms 15.6 ms 15.6 ms pglue 8.3 ms 121.0 ( 5.0 ms … 13.6 ms) 9.3 ms 13.6 ms 13.6 ms
postgres.js 14.8 ms 67.4 ( 9.6 ms … 35.8 ms) 16.6 ms 35.8 ms 35.8 ms postgres-js 13.0 ms 76.7 ( 9.0 ms … 26.9 ms) 14.1 ms 26.9 ms 26.9 ms
deno-postgres 18.8 ms 53.1 ( 14.5 ms … 25.8 ms) 20.9 ms 25.8 ms 25.8 ms deno-postgres 19.8 ms 50.5 ( 14.2 ms … 31.8 ms) 22.5 ms 31.8 ms 31.8 ms
summary summary
pglue pglue
1.62x faster than postgres.js 1.58x faster than postgres-js
2.05x faster than deno-postgres 2.40x faster than deno-postgres
group insert n=200 group insert n=200
pglue 15.0 ms 66.6 ( 11.1 ms … 19.0 ms) 16.7 ms 19.0 ms 19.0 ms pglue 15.1 ms 66.2 ( 9.4 ms … 21.1 ms) 16.8 ms 21.1 ms 21.1 ms
postgres.js 28.1 ms 35.6 ( 22.8 ms … 40.0 ms) 29.1 ms 40.0 ms 40.0 ms postgres-js 27.8 ms 36.0 ( 22.5 ms … 39.2 ms) 30.2 ms 39.2 ms 39.2 ms
deno-postgres 35.9 ms 27.9 ( 29.7 ms … 46.5 ms) 37.2 ms 46.5 ms 46.5 ms deno-postgres 40.6 ms 24.6 ( 33.5 ms … 51.4 ms) 42.2 ms 51.4 ms 51.4 ms
summary summary
pglue pglue
1.87x faster than postgres.js 1.84x faster than postgres-js
2.39x faster than deno-postgres 2.68x faster than deno-postgres
``` ```
[1]: https://github.com/porsager/postgres [1]: https://github.com/porsager/postgres

View File

@@ -1,4 +1,4 @@
import * as pglue from "./mod.ts"; import pglue from "./mod.ts";
import postgres_js from "https://deno.land/x/postgresjs/mod.js"; import postgres_js from "https://deno.land/x/postgresjs/mod.js";
import * as deno_postgres from "https://deno.land/x/postgres/mod.ts"; import * as deno_postgres from "https://deno.land/x/postgres/mod.ts";
@@ -60,7 +60,7 @@ for (const n of [1, 5, 10]) {
}); });
Deno.bench({ Deno.bench({
name: `postgres.js`, name: `postgres-js`,
group: `select n=${n}`, group: `select n=${n}`,
async fn(b) { async fn(b) {
await bench_select(b, n, () => c_pgjs`select * from pg_type`); await bench_select(b, n, () => c_pgjs`select * from pg_type`);
@@ -95,7 +95,7 @@ for (const n of [1, 10, 100, 200]) {
}); });
Deno.bench({ Deno.bench({
name: `postgres.js`, name: `postgres-js`,
group: `insert n=${n}`, group: `insert n=${n}`,
async fn(b) { async fn(b) {
await c_pgjs`begin`; await c_pgjs`begin`;

View File

@@ -1,9 +1,5 @@
{ {
"name": "@luaneko/pglue", "name": "@luaneko/pglue",
"version": "0.1.0", "version": "0.1.3",
"exports": "./mod.ts", "exports": "./mod.ts"
"tasks": {
"test": "deno run --watch -A mod_test.ts",
"bench": "deno bench --watch -A"
}
} }

39
deno.lock generated
View File

@@ -2,23 +2,50 @@
"version": "4", "version": "4",
"specifiers": { "specifiers": {
"jsr:@badrap/valita@~0.4.2": "0.4.2", "jsr:@badrap/valita@~0.4.2": "0.4.2",
"jsr:@std/assert@^1.0.10": "1.0.10",
"jsr:@std/bytes@^1.0.3": "1.0.4",
"jsr:@std/bytes@^1.0.4": "1.0.4", "jsr:@std/bytes@^1.0.4": "1.0.4",
"jsr:@std/encoding@^1.0.6": "1.0.6", "jsr:@std/encoding@^1.0.6": "1.0.6",
"jsr:@std/expect@*": "1.0.11",
"jsr:@std/internal@^1.0.5": "1.0.5",
"jsr:@std/path@^1.0.8": "1.0.8", "jsr:@std/path@^1.0.8": "1.0.8",
"jsr:@std/streams@*": "1.0.8",
"npm:pg-connection-string@^2.7.0": "2.7.0" "npm:pg-connection-string@^2.7.0": "2.7.0"
}, },
"jsr": { "jsr": {
"@badrap/valita@0.4.2": { "@badrap/valita@0.4.2": {
"integrity": "af8a829e82eac71adbc7b60352798f94dcc66d19fab16b657957ca9e646c25fd" "integrity": "af8a829e82eac71adbc7b60352798f94dcc66d19fab16b657957ca9e646c25fd"
}, },
"@std/assert@1.0.10": {
"integrity": "59b5cbac5bd55459a19045d95cc7c2ff787b4f8527c0dd195078ff6f9481fbb3",
"dependencies": [
"jsr:@std/internal"
]
},
"@std/bytes@1.0.4": { "@std/bytes@1.0.4": {
"integrity": "11a0debe522707c95c7b7ef89b478c13fb1583a7cfb9a85674cd2cc2e3a28abc" "integrity": "11a0debe522707c95c7b7ef89b478c13fb1583a7cfb9a85674cd2cc2e3a28abc"
}, },
"@std/encoding@1.0.6": { "@std/encoding@1.0.6": {
"integrity": "ca87122c196e8831737d9547acf001766618e78cd8c33920776c7f5885546069" "integrity": "ca87122c196e8831737d9547acf001766618e78cd8c33920776c7f5885546069"
}, },
"@std/expect@1.0.11": {
"integrity": "5aa5d5cf891e9a3249e45ea770de15189e5a2faee2122ee5746b10d1c310a19b",
"dependencies": [
"jsr:@std/assert",
"jsr:@std/internal"
]
},
"@std/internal@1.0.5": {
"integrity": "54a546004f769c1ac9e025abd15a76b6671ddc9687e2313b67376125650dc7ba"
},
"@std/path@1.0.8": { "@std/path@1.0.8": {
"integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be" "integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be"
},
"@std/streams@1.0.8": {
"integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3",
"dependencies": [
"jsr:@std/bytes@^1.0.3"
]
} }
}, },
"npm": { "npm": {
@@ -433,11 +460,11 @@
"https://deno.land/x/postgresjs@v3.4.5/src/result.js": "001ff5e0c8d634674f483d07fbcd620a797e3101f842d6c20ca3ace936260465", "https://deno.land/x/postgresjs@v3.4.5/src/result.js": "001ff5e0c8d634674f483d07fbcd620a797e3101f842d6c20ca3ace936260465",
"https://deno.land/x/postgresjs@v3.4.5/src/subscribe.js": "9e4d0c3e573a6048e77ee2f15abbd5bcd17da9ca85a78c914553472c6d6c169b", "https://deno.land/x/postgresjs@v3.4.5/src/subscribe.js": "9e4d0c3e573a6048e77ee2f15abbd5bcd17da9ca85a78c914553472c6d6c169b",
"https://deno.land/x/postgresjs@v3.4.5/src/types.js": "471f4a6c35412aa202a7c177c0a7e5a7c3bd225f01bbde67c947894c1b8bf6ed", "https://deno.land/x/postgresjs@v3.4.5/src/types.js": "471f4a6c35412aa202a7c177c0a7e5a7c3bd225f01bbde67c947894c1b8bf6ed",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e", "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6", "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438", "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff", "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53", "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53",
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13" "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13"
} }
} }

View File

@@ -1 +1 @@
export * from "https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/mod.ts"; export * from "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts";

2
mod.ts
View File

@@ -9,7 +9,7 @@ import {
unknown, unknown,
} from "./valita.ts"; } from "./valita.ts";
import { Pool, wire_connect, type LogLevel } from "./wire.ts"; import { Pool, wire_connect, type LogLevel } from "./wire.ts";
import { sql_types, type SqlType, type SqlTypeMap } from "./query.ts"; import { sql_types, type SqlTypeMap } from "./query.ts";
export { export {
WireError, WireError,

View File

@@ -1,23 +0,0 @@
import postgres from "./mod.ts";
await using pool = postgres(`postgres://test:test@localhost:5432/test`, {
runtime_params: { client_min_messages: "INFO" },
});
pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx));
await pool.begin(async (pg, tx) => {
await pg.query`
create table my_test (
key integer primary key generated always as identity,
data text not null
)
`;
await pg.query`
insert into my_test (data) values (${[1, 2, 3]}::bytea)
`;
console.log(await pg.query`select * from my_test`);
await tx.rollback();
});

View File

@@ -127,10 +127,36 @@ export const bool: SqlType = {
return s !== "f"; return s !== "f";
}, },
output(x) { output(x) {
return typeof x === "undefined" || x === null ? null : x ? "t" : "f"; if (typeof x === "undefined" || x === null) return null;
const b = bool_names[String(x).toLowerCase()];
if (typeof b === "boolean") return b ? "t" : "f";
else throw new SqlTypeError(`invalid bool output '${x}'`);
}, },
}; };
const bool_names: Partial<Record<string, boolean>> = {
// https://www.postgresql.org/docs/current/datatype-boolean.html#DATATYPE-BOOLEAN
t: true,
tr: true,
tru: true,
true: true,
y: true,
ye: true,
yes: true,
on: true,
1: true,
f: false,
fa: false,
fal: false,
fals: false,
false: false,
n: false,
no: false,
of: false,
off: false,
0: false,
};
export const text: SqlType = { export const text: SqlType = {
input(s) { input(s) {
return s; return s;
@@ -401,7 +427,7 @@ export class Query<T = Row>
async first(): Promise<Result<T>> { async first(): Promise<Result<T>> {
const { rows, tag } = await this.collect(1); const { rows, tag } = await this.collect(1);
if (!rows.length) throw new Error(`expected one row, got none instead`); if (!rows.length) throw new TypeError(`expected one row, got none instead`);
const row = rows[0]; const row = rows[0];
return Object.assign([row] as const, { row: rows[0], tag }); return Object.assign([row] as const, { row: rows[0], tag });
} }
@@ -417,8 +443,9 @@ export class Query<T = Row>
let next; let next;
const rows = []; const rows = [];
for (let i = 0; !(next = await iter.next()).done; ) { for (let i = 0; !(next = await iter.next()).done; ) {
const { value: c } = next; const chunk = next.value;
for (let j = 0, n = c.length; i < count && j < n; ) rows[i++] = c[j++]; for (let j = 0, n = chunk.length; i < count && j < n; )
rows[i++] = chunk[j++];
} }
return Object.assign(rows, next.value, { rows }); return Object.assign(rows, next.value, { rows });
} }
@@ -453,7 +480,8 @@ function str_to_stream(s: string) {
return new ReadableStream({ return new ReadableStream({
type: "bytes", type: "bytes",
start(c) { start(c) {
c.enqueue(to_utf8(s)), c.close(); if (s.length !== 0) c.enqueue(to_utf8(s));
c.close();
}, },
}); });
} }

209
test.ts Normal file
View File

@@ -0,0 +1,209 @@
import pglue, { PostgresError, SqlTypeError } from "./mod.ts";
import { expect } from "jsr:@std/expect";
import { toText } from "jsr:@std/streams";
async function connect() {
const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, {
runtime_params: { client_min_messages: "INFO" },
});
return pg.on("log", (_level, ctx, msg) => {
console.info(`${msg}`, ctx);
});
}
Deno.test(`integers`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const [{ a, b, c }] = await pg.query`
select
${"0x100"}::int2 as a,
${777}::int4 as b,
${{
[Symbol.toPrimitive](hint: string) {
expect(hint).toBe("number");
return "1234";
},
}}::int8 as c
`.first();
expect(a).toBe(0x100);
expect(b).toBe(777);
expect(c).toBe(1234);
const [{ large }] =
await pg.query`select ${"10000000000000000"}::int8 as large`.first();
expect(large).toBe(10000000000000000n);
await expect(pg.query`select ${100000}::int2`).rejects.toThrow(SqlTypeError);
await expect(pg.query`select ${"100000"}::text::int2`).rejects.toThrow(
PostgresError
);
});
Deno.test(`boolean`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const [{ a, b, c }] = await pg.query`
select
${true}::bool as a,
${"n"}::bool as b,
${undefined}::bool as c
`.first();
expect(a).toBe(true);
expect(b).toBe(false);
expect(c).toBe(null);
});
Deno.test(`bytea`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const [{ string, array, buffer }] = await pg.query`
select
${"hello, world"}::bytea as string,
${[1, 2, 3, 4, 5]}::bytea as array,
${Uint8Array.of(5, 4, 3, 2, 1)}::bytea as buffer
`.first();
expect(string).toEqual(new TextEncoder().encode("hello, world"));
expect(array).toEqual(Uint8Array.of(1, 2, 3, 4, 5));
expect(buffer).toEqual(Uint8Array.of(5, 4, 3, 2, 1));
});
Deno.test(`row`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
expect(
(
await pg.query`create table my_table (a text not null, b text not null, c text not null)`
).tag
).toBe(`CREATE TABLE`);
expect(
(
await pg.query`copy my_table from stdin`.stdin(
`field a\tfield b\tfield c`
)
).tag
).toBe(`COPY 1`);
const [row] = await pg.query`select * from my_table`.first();
{
// columns by name
const { a, b, c } = row;
expect(a).toBe("field a");
expect(b).toBe("field b");
expect(c).toBe("field c");
}
{
// columns by index
const [a, b, c] = row;
expect(a).toBe("field a");
expect(b).toBe("field b");
expect(c).toBe("field c");
}
const { readable, writable } = new TransformStream<Uint8Array>(
{},
new ByteLengthQueuingStrategy({ highWaterMark: 4096 }),
new ByteLengthQueuingStrategy({ highWaterMark: 4096 })
);
await pg.query`copy my_table to stdout`.stdout(writable);
expect(await toText(readable)).toBe(`field a\tfield b\tfield c\n`);
});
Deno.test(`sql injection`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
const input = `injection'); drop table users; --`;
expect((await pg.query`create table users (name text not null)`).tag).toBe(
`CREATE TABLE`
);
expect((await pg.query`insert into users (name) values (${input})`).tag).toBe(
`INSERT 0 1`
);
const [{ name }] = await pg.query<{ name: string }>`
select name from users
`.first();
expect(name).toBe(input);
});
Deno.test(`pubsub`, async () => {
await using pg = await connect();
const sent: string[] = [];
await using ch = await pg.listen(`my channel`, (payload) => {
expect(payload).toBe(sent.shift());
});
for (let i = 0; i < 5; i++) {
const payload = `test payload ${i}`;
sent.push(payload);
await ch.notify(payload);
}
});
Deno.test(`transactions`, async () => {
await using pg = await connect();
await pg.begin(async (pg) => {
await pg.begin(async (pg, tx) => {
await pg.query`create table my_table (field text not null)`;
await tx.rollback();
});
await expect(pg.query`select * from my_table`).rejects.toThrow(
PostgresError
);
});
await expect(pg.query`select * from my_table`).rejects.toThrow(PostgresError);
await pg.begin(async (pg) => {
await pg.begin(async (pg, tx) => {
await pg.begin(async (pg, tx) => {
await pg.begin(async (pg) => {
await pg.query`create table my_table (field text not null)`;
});
await tx.commit();
});
expect(await pg.query`select * from my_table`.count()).toBe(0);
await tx.rollback();
});
await expect(pg.query`select * from my_table`).rejects.toThrow(
PostgresError
);
});
});
Deno.test(`streaming`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
await pg.query`create table my_table (field text not null)`;
for (let i = 0; i < 100; i++) {
await pg.query`insert into my_table (field) values (${i})`;
}
let i = 0;
for await (const chunk of pg.query`select * from my_table`.chunked(10)) {
expect(chunk.length).toBe(10);
for (const row of chunk) expect(row.field).toBe(`${i++}`);
}
expect(i).toBe(100);
});

97
wire.ts
View File

@@ -528,8 +528,8 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
(this.#connected = this.#auth()).catch(close); (this.#connected = this.#auth()).catch(close);
} }
query(sql: SqlFragment): Query; query<T = Row>(sql: SqlFragment): Query<T>;
query(s: TemplateStringsArray, ...xs: unknown[]): Query; query<T = Row>(s: TemplateStringsArray, ...xs: unknown[]): Query<T>;
query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) { query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) {
return this.#query(is_sql(s) ? s : sql(s, ...xs)); return this.#query(is_sql(s) ? s : sql(s, ...xs));
} }
@@ -557,9 +557,9 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
return this.#notify(channel, payload); return this.#notify(channel, payload);
} }
async get(param: string, missing_null = true) { async get(param: string) {
return ( return (
await this.query`select current_setting(${param}, ${missing_null})` await this.query`select current_setting(${param}, true)`
.map(([s]) => String(s)) .map(([s]) => String(s))
.first_or(null) .first_or(null)
)[0]; )[0];
@@ -579,6 +579,11 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
} }
} }
const msg_PD = object({ P: Parse, D: Describe });
const msg_BE = object({ B: Bind, E: Execute });
const msg_BEc = object({ B: Bind, E: Execute, c: CopyDone });
const msg_BEcC = object({ B: Bind, E: Execute, c: CopyDone, C: Close });
function wire_impl( function wire_impl(
wire: Wire, wire: Wire,
socket: Deno.Conn, socket: Deno.Conn,
@@ -609,6 +614,7 @@ function wire_impl(
} }
const read_recv = channel.receiver<Uint8Array>(async function read(send) { const read_recv = channel.receiver<Uint8Array>(async function read(send) {
let err: unknown;
try { try {
let buf = new Uint8Array(); let buf = new Uint8Array();
for await (const chunk of read_socket()) { for await (const chunk of read_socket()) {
@@ -656,9 +662,10 @@ function wire_impl(
} }
if (buf.length !== 0) throw new WireError(`unexpected end of stream`); if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
wire.emit("close");
} catch (e) { } catch (e) {
wire.emit("close", e); throw ((err = e), e);
} finally {
wire.emit("close", err);
} }
}); });
@@ -690,23 +697,31 @@ function wire_impl(
} }
function pipeline_read<T>(r: () => T | PromiseLike<T>) { function pipeline_read<T>(r: () => T | PromiseLike<T>) {
return rlock(async () => { return rlock(async function pipeline_read() {
try { try {
return await r(); return await r();
} finally { } finally {
try {
let msg; let msg;
while (msg_type((msg = await read_raw())) !== ReadyForQuery.type); while (msg_type((msg = await read_raw())) !== ReadyForQuery.type);
({ tx_status } = ser_decode(ReadyForQuery, msg)); ({ tx_status } = ser_decode(ReadyForQuery, msg));
} catch {
// ignored
}
} }
}); });
} }
function pipeline_write<T>(w: () => T | PromiseLike<T>) { function pipeline_write<T>(w: () => T | PromiseLike<T>) {
return wlock(async () => { return wlock(async function pipeline_write() {
try { try {
return await w(); return await w();
} finally { } finally {
try {
await write(Sync, {}); await write(Sync, {});
} catch {
// ignored
}
} }
}); });
} }
@@ -894,10 +909,11 @@ function wire_impl(
try { try {
const { name, query } = this; const { name, query } = this;
return await pipeline( return await pipeline(
async () => { () =>
await write(Parse, { statement: name, query, param_types: [] }); write(msg_PD, {
await write(Describe, { which: "S", name }); P: { statement: name, query, param_types: [] },
}, D: { which: "S", name },
}),
async () => { async () => {
await read(ParseComplete); await read(ParseComplete);
const param_desc = await read(ParameterDescription); const param_desc = await read(ParameterDescription);
@@ -909,8 +925,8 @@ function wire_impl(
: ser_decode(RowDescription, msg); : ser_decode(RowDescription, msg);
return { return {
ser_params: param_ser(param_desc), ser_params: make_param_ser(param_desc),
Row: row_ctor(row_desc), Row: make_row_ctor(row_desc),
}; };
} }
); );
@@ -930,7 +946,7 @@ function wire_impl(
(params: unknown[]): (string | null)[]; (params: unknown[]): (string | null)[];
} }
function param_ser({ param_types }: ParameterDescription) { function make_param_ser({ param_types }: ParameterDescription) {
return jit.compiled<ParameterSerializer>`function ser_params(xs) { return jit.compiled<ParameterSerializer>`function ser_params(xs) {
return [ return [
${jit.map(", ", param_types, (type_oid, i) => { ${jit.map(", ", param_types, (type_oid, i) => {
@@ -946,7 +962,7 @@ function wire_impl(
new (columns: (BinaryLike | null)[]): Row; new (columns: (BinaryLike | null)[]): Row;
} }
function row_ctor({ columns }: RowDescription) { function make_row_ctor({ columns }: RowDescription) {
const Row = jit.compiled<RowConstructor>`function Row(xs) { const Row = jit.compiled<RowConstructor>`function Row(xs) {
${jit.map(" ", columns, ({ name, type_oid }, i) => { ${jit.map(" ", columns, ({ name, type_oid }, i) => {
const type = types[type_oid] ?? text; const type = types[type_oid] ?? text;
@@ -1019,6 +1035,10 @@ function wire_impl(
const { data } = ser_decode(CopyData, msg_check_err(msg)); const { data } = ser_decode(CopyData, msg_check_err(msg));
await writer.write(to_utf8(data)); await writer.write(to_utf8(data));
} }
await writer.close();
} catch (e) {
await writer.abort(e);
throw e;
} finally { } finally {
writer.releaseLock(); writer.releaseLock();
} }
@@ -1030,17 +1050,13 @@ function wire_impl(
async function write_copy_in(stream: ReadableStream<Uint8Array> | null) { async function write_copy_in(stream: ReadableStream<Uint8Array> | null) {
if (stream !== null) { if (stream !== null) {
const reader = stream.getReader(); const reader = stream.getReader();
let err;
try {
try { try {
for (let next; !(next = await reader.read()).done; ) for (let next; !(next = await reader.read()).done; )
await write(CopyData, { data: next.value }); await write(CopyData, { data: next.value });
await write(CopyDone, {});
} catch (e) { } catch (e) {
err = e; await write(CopyFail, { cause: String(e) });
} finally { throw e;
if (typeof err === "undefined") await write(CopyDone, {});
else await write(CopyFail, { cause: String(err) });
}
} finally { } finally {
reader.releaseLock(); reader.releaseLock();
} }
@@ -1068,16 +1084,23 @@ function wire_impl(
try { try {
const { rows, tag } = await pipeline( const { rows, tag } = await pipeline(
async () => { async () => {
await write(Bind, { const B = {
portal, portal,
statement: st.name, statement: st.name,
param_formats: [], param_formats: [],
param_values, param_values,
column_formats: [], column_formats: [],
}); };
await write(Execute, { portal, row_limit: 0 }); const E = { portal, row_limit: 0 };
const C = { which: "P" as const, name: portal };
if (stdin !== null) {
await write(msg_BE, { B, E });
await write_copy_in(stdin); await write_copy_in(stdin);
await write(Close, { which: "P" as const, name: portal }); await write(Close, C);
} else {
return write(msg_BEcC, { B, E, c: {}, C });
}
}, },
async () => { async () => {
await read(BindComplete); await read(BindComplete);
@@ -1088,10 +1111,14 @@ function wire_impl(
if (rows.length) yield rows; if (rows.length) yield rows;
return { tag }; return { tag };
} catch (e) { } catch (e) {
try {
await pipeline( await pipeline(
() => write(Close, { which: "P" as const, name: portal }), () => write(Close, { which: "P" as const, name: portal }),
() => read(CloseComplete) () => read(CloseComplete)
); );
} catch {
// ignored
}
throw e; throw e;
} }
@@ -1117,15 +1144,21 @@ function wire_impl(
try { try {
let { done, rows, tag } = await pipeline( let { done, rows, tag } = await pipeline(
async () => { async () => {
await write(Bind, { const B = {
portal, portal,
statement: st.name, statement: st.name,
param_formats: [], param_formats: [],
param_values, param_values,
column_formats: [], column_formats: [],
}); };
await write(Execute, { portal, row_limit: chunk_size }); const E = { portal, row_limit: chunk_size };
if (stdin !== null) {
await write(msg_BE, { B, E });
await write_copy_in(stdin); await write_copy_in(stdin);
} else {
return write(msg_BEc, { B, E, c: {} });
}
}, },
async () => { async () => {
await read(BindComplete); await read(BindComplete);
@@ -1320,8 +1353,8 @@ export class Pool
} }
} }
query(sql: SqlFragment): Query; query<T = Row>(sql: SqlFragment): Query<T>;
query(s: TemplateStringsArray, ...xs: unknown[]): Query; query<T = Row>(s: TemplateStringsArray, ...xs: unknown[]): Query<T>;
query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) { query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) {
s = is_sql(s) ? s : sql(s, ...xs); s = is_sql(s) ? s : sql(s, ...xs);
const acquire = this.#acquire; const acquire = this.#acquire;