Compare commits
11 Commits
a88da00dec
...
v0.2.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
00002525e4
|
|||
|
328cc63536
|
|||
|
29b79f25c0
|
|||
|
29b2796627
|
|||
|
02f8098811
|
|||
|
da7f7e12f3
|
|||
|
6f9e9770cf
|
|||
|
137422601b
|
|||
|
3793e14f50
|
|||
|
b194397645
|
|||
|
858b7a95f3
|
80
README.md
80
README.md
@@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL.
|
|||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
```ts
|
```ts
|
||||||
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/0.1.1/mod.ts";
|
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.2.0/mod.ts";
|
||||||
// ...or from github:
|
// ...or from github:
|
||||||
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/0.1.1/mod.ts";
|
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.2.0/mod.ts";
|
||||||
```
|
```
|
||||||
|
|
||||||
## Documentation
|
## Documentation
|
||||||
@@ -25,13 +25,13 @@ TODO: Write the documentation in more detail here.
|
|||||||
|
|
||||||
## Benchmarks
|
## Benchmarks
|
||||||
|
|
||||||
Performance is generally on par with [postgres-js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
|
Performance is generally on par with [postgres.js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
|
||||||
|
|
||||||
Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost:
|
Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost:
|
||||||
|
|
||||||
Query `select * from pg_type`:
|
Query `select * from pg_type`:
|
||||||
|
|
||||||
```log
|
```
|
||||||
CPU | Common KVM Processor v2.0
|
CPU | Common KVM Processor v2.0
|
||||||
Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu)
|
Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu)
|
||||||
|
|
||||||
@@ -39,78 +39,78 @@ benchmark time/iter (avg) iter/s (min … max) p75
|
|||||||
--------------- ----------------------------- --------------------- --------------------------
|
--------------- ----------------------------- --------------------- --------------------------
|
||||||
|
|
||||||
group select n=1
|
group select n=1
|
||||||
pglue 8.3 ms 120.4 ( 7.2 ms … 14.4 ms) 8.5 ms 14.4 ms 14.4 ms
|
pglue 8.8 ms 113.8 ( 7.2 ms … 11.8 ms) 9.7 ms 11.8 ms 11.8 ms
|
||||||
postgres-js 10.8 ms 92.3 ( 8.1 ms … 26.5 ms) 10.7 ms 26.5 ms 26.5 ms
|
postgres.js 10.8 ms 92.3 ( 8.1 ms … 22.0 ms) 11.2 ms 22.0 ms 22.0 ms
|
||||||
deno-postgres 37.1 ms 26.9 ( 33.4 ms … 41.3 ms) 38.5 ms 41.3 ms 41.3 ms
|
deno-postgres 38.9 ms 25.7 ( 23.5 ms … 51.9 ms) 40.3 ms 51.9 ms 51.9 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.30x faster than postgres-js
|
1.23x faster than postgres.js
|
||||||
4.47x faster than deno-postgres
|
4.42x faster than deno-postgres
|
||||||
|
|
||||||
group select n=5
|
group select n=5
|
||||||
pglue 39.9 ms 25.1 ( 37.2 ms … 49.6 ms) 40.8 ms 49.6 ms 49.6 ms
|
pglue 40.1 ms 25.0 ( 36.1 ms … 48.2 ms) 40.7 ms 48.2 ms 48.2 ms
|
||||||
postgres-js 42.4 ms 23.6 ( 36.5 ms … 61.8 ms) 44.2 ms 61.8 ms 61.8 ms
|
postgres.js 48.7 ms 20.5 ( 38.9 ms … 61.2 ms) 52.7 ms 61.2 ms 61.2 ms
|
||||||
deno-postgres 182.5 ms 5.5 (131.9 ms … 211.8 ms) 193.4 ms 211.8 ms 211.8 ms
|
deno-postgres 184.7 ms 5.4 (166.5 ms … 209.5 ms) 190.7 ms 209.5 ms 209.5 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.06x faster than postgres-js
|
1.22x faster than postgres.js
|
||||||
4.57x faster than deno-postgres
|
4.61x faster than deno-postgres
|
||||||
|
|
||||||
group select n=10
|
group select n=10
|
||||||
pglue 78.9 ms 12.7 ( 72.3 ms … 88.9 ms) 82.5 ms 88.9 ms 88.9 ms
|
pglue 80.7 ms 12.4 ( 73.5 ms … 95.4 ms) 82.2 ms 95.4 ms 95.4 ms
|
||||||
postgres-js 92.0 ms 10.9 ( 77.6 ms … 113.6 ms) 101.2 ms 113.6 ms 113.6 ms
|
postgres.js 89.1 ms 11.2 ( 82.5 ms … 101.7 ms) 94.4 ms 101.7 ms 101.7 ms
|
||||||
deno-postgres 326.6 ms 3.1 (208.8 ms … 406.0 ms) 388.8 ms 406.0 ms 406.0 ms
|
deno-postgres 375.3 ms 2.7 (327.4 ms … 393.9 ms) 390.7 ms 393.9 ms 393.9 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.17x faster than postgres-js
|
1.10x faster than postgres.js
|
||||||
4.14x faster than deno-postgres
|
4.65x faster than deno-postgres
|
||||||
```
|
```
|
||||||
|
|
||||||
Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`:
|
Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`:
|
||||||
|
|
||||||
```log
|
```
|
||||||
group insert n=1
|
group insert n=1
|
||||||
pglue 303.3 µs 3,297 (165.6 µs … 2.4 ms) 321.6 µs 1.1 ms 2.4 ms
|
pglue 259.2 µs 3,858 (165.4 µs … 2.8 ms) 258.0 µs 775.4 µs 2.8 ms
|
||||||
postgres-js 260.4 µs 3,840 (132.9 µs … 2.7 ms) 276.4 µs 1.1 ms 2.7 ms
|
postgres.js 235.9 µs 4,239 (148.8 µs … 1.2 ms) 250.3 µs 577.4 µs 585.6 µs
|
||||||
deno-postgres 281.6 µs 3,552 (186.1 µs … 1.5 ms) 303.8 µs 613.6 µs 791.8 µs
|
deno-postgres 306.7 µs 3,260 (198.8 µs … 1.3 ms) 325.9 µs 1.0 ms 1.3 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.17x slower than postgres-js
|
1.10x slower than postgres.js
|
||||||
1.08x slower than deno-postgres
|
1.18x faster than deno-postgres
|
||||||
|
|
||||||
group insert n=10
|
group insert n=10
|
||||||
pglue 1.1 ms 878.5 (605.5 µs … 3.2 ms) 1.1 ms 2.2 ms 3.2 ms
|
pglue 789.7 µs 1,266 (553.2 µs … 2.7 ms) 783.4 µs 2.4 ms 2.7 ms
|
||||||
postgres-js 849.3 µs 1,177 (529.5 µs … 10.1 ms) 770.6 µs 3.0 ms 10.1 ms
|
postgres.js 755.6 µs 1,323 (500.5 µs … 3.4 ms) 795.0 µs 2.8 ms 3.4 ms
|
||||||
deno-postgres 2.3 ms 439.4 ( 1.4 ms … 4.9 ms) 2.5 ms 4.1 ms 4.9 ms
|
deno-postgres 2.2 ms 458.1 ( 1.6 ms … 5.2 ms) 2.3 ms 4.8 ms 5.2 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.34x slower than postgres-js
|
1.04x slower than postgres.js
|
||||||
2.00x faster than deno-postgres
|
2.76x faster than deno-postgres
|
||||||
|
|
||||||
group insert n=100
|
group insert n=100
|
||||||
pglue 8.3 ms 121.0 ( 5.0 ms … 13.6 ms) 9.3 ms 13.6 ms 13.6 ms
|
pglue 5.8 ms 172.0 ( 3.2 ms … 9.9 ms) 6.8 ms 9.9 ms 9.9 ms
|
||||||
postgres-js 13.0 ms 76.7 ( 9.0 ms … 26.9 ms) 14.1 ms 26.9 ms 26.9 ms
|
postgres.js 13.0 ms 76.8 ( 8.6 ms … 20.8 ms) 15.4 ms 20.8 ms 20.8 ms
|
||||||
deno-postgres 19.8 ms 50.5 ( 14.2 ms … 31.8 ms) 22.5 ms 31.8 ms 31.8 ms
|
deno-postgres 18.5 ms 54.1 ( 14.3 ms … 32.1 ms) 20.0 ms 32.1 ms 32.1 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.58x faster than postgres-js
|
2.24x faster than postgres.js
|
||||||
2.40x faster than deno-postgres
|
3.18x faster than deno-postgres
|
||||||
|
|
||||||
group insert n=200
|
group insert n=200
|
||||||
pglue 15.1 ms 66.2 ( 9.4 ms … 21.1 ms) 16.8 ms 21.1 ms 21.1 ms
|
pglue 8.8 ms 113.4 ( 6.0 ms … 14.1 ms) 10.0 ms 14.1 ms 14.1 ms
|
||||||
postgres-js 27.8 ms 36.0 ( 22.5 ms … 39.2 ms) 30.2 ms 39.2 ms 39.2 ms
|
postgres.js 28.2 ms 35.5 ( 21.1 ms … 47.0 ms) 29.6 ms 47.0 ms 47.0 ms
|
||||||
deno-postgres 40.6 ms 24.6 ( 33.5 ms … 51.4 ms) 42.2 ms 51.4 ms 51.4 ms
|
deno-postgres 37.0 ms 27.0 ( 32.0 ms … 48.1 ms) 39.4 ms 48.1 ms 48.1 ms
|
||||||
|
|
||||||
summary
|
summary
|
||||||
pglue
|
pglue
|
||||||
1.84x faster than postgres-js
|
3.20x faster than postgres.js
|
||||||
2.68x faster than deno-postgres
|
4.20x faster than deno-postgres
|
||||||
```
|
```
|
||||||
|
|
||||||
[1]: https://github.com/porsager/postgres
|
[1]: https://github.com/porsager/postgres
|
||||||
|
|||||||
4
bench.ts
4
bench.ts
@@ -60,7 +60,7 @@ for (const n of [1, 5, 10]) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
Deno.bench({
|
Deno.bench({
|
||||||
name: `postgres-js`,
|
name: `postgres.js`,
|
||||||
group: `select n=${n}`,
|
group: `select n=${n}`,
|
||||||
async fn(b) {
|
async fn(b) {
|
||||||
await bench_select(b, n, () => c_pgjs`select * from pg_type`);
|
await bench_select(b, n, () => c_pgjs`select * from pg_type`);
|
||||||
@@ -95,7 +95,7 @@ for (const n of [1, 10, 100, 200]) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
Deno.bench({
|
Deno.bench({
|
||||||
name: `postgres-js`,
|
name: `postgres.js`,
|
||||||
group: `insert n=${n}`,
|
group: `insert n=${n}`,
|
||||||
async fn(b) {
|
async fn(b) {
|
||||||
await c_pgjs`begin`;
|
await c_pgjs`begin`;
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "@luaneko/pglue",
|
"name": "@luaneko/pglue",
|
||||||
"version": "0.1.1",
|
"version": "0.2.0",
|
||||||
"exports": "./mod.ts"
|
"exports": "./mod.ts"
|
||||||
}
|
}
|
||||||
|
|||||||
12
deno.lock
generated
12
deno.lock
generated
@@ -460,11 +460,11 @@
|
|||||||
"https://deno.land/x/postgresjs@v3.4.5/src/result.js": "001ff5e0c8d634674f483d07fbcd620a797e3101f842d6c20ca3ace936260465",
|
"https://deno.land/x/postgresjs@v3.4.5/src/result.js": "001ff5e0c8d634674f483d07fbcd620a797e3101f842d6c20ca3ace936260465",
|
||||||
"https://deno.land/x/postgresjs@v3.4.5/src/subscribe.js": "9e4d0c3e573a6048e77ee2f15abbd5bcd17da9ca85a78c914553472c6d6c169b",
|
"https://deno.land/x/postgresjs@v3.4.5/src/subscribe.js": "9e4d0c3e573a6048e77ee2f15abbd5bcd17da9ca85a78c914553472c6d6c169b",
|
||||||
"https://deno.land/x/postgresjs@v3.4.5/src/types.js": "471f4a6c35412aa202a7c177c0a7e5a7c3bd225f01bbde67c947894c1b8bf6ed",
|
"https://deno.land/x/postgresjs@v3.4.5/src/types.js": "471f4a6c35412aa202a7c177c0a7e5a7c3bd225f01bbde67c947894c1b8bf6ed",
|
||||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
|
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
|
||||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6",
|
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6",
|
||||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438",
|
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438",
|
||||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
|
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
|
||||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53",
|
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53",
|
||||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13"
|
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
lstd.ts
2
lstd.ts
@@ -1 +1 @@
|
|||||||
export * from "https://git.lua.re/luaneko/lstd/raw/tag/0.2.0/mod.ts";
|
export * from "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts";
|
||||||
|
|||||||
12
mod.ts
12
mod.ts
@@ -8,7 +8,7 @@ import {
|
|||||||
union,
|
union,
|
||||||
unknown,
|
unknown,
|
||||||
} from "./valita.ts";
|
} from "./valita.ts";
|
||||||
import { Pool, wire_connect, type LogLevel } from "./wire.ts";
|
import { Pool, wire_connect } from "./wire.ts";
|
||||||
import { sql_types, type SqlTypeMap } from "./query.ts";
|
import { sql_types, type SqlTypeMap } from "./query.ts";
|
||||||
|
|
||||||
export {
|
export {
|
||||||
@@ -61,6 +61,7 @@ const ParsedOptions = object({
|
|||||||
runtime_params: record(string()).optional(() => ({})),
|
runtime_params: record(string()).optional(() => ({})),
|
||||||
max_connections: number().optional(() => 10),
|
max_connections: number().optional(() => 10),
|
||||||
idle_timeout: number().optional(() => 20),
|
idle_timeout: number().optional(() => 20),
|
||||||
|
reconnect_delay: number().optional(() => 5),
|
||||||
types: record(unknown())
|
types: record(unknown())
|
||||||
.optional(() => ({}))
|
.optional(() => ({}))
|
||||||
.map((types): SqlTypeMap => ({ ...sql_types, ...types })),
|
.map((types): SqlTypeMap => ({ ...sql_types, ...types })),
|
||||||
@@ -101,10 +102,6 @@ export function connect(s: string, options: Options = {}) {
|
|||||||
|
|
||||||
postgres.connect = connect;
|
postgres.connect = connect;
|
||||||
|
|
||||||
export type PostgresEvents = {
|
|
||||||
log(level: LogLevel, ctx: object, msg: string): void;
|
|
||||||
};
|
|
||||||
|
|
||||||
export class Postgres extends Pool {
|
export class Postgres extends Pool {
|
||||||
readonly #options;
|
readonly #options;
|
||||||
|
|
||||||
@@ -113,8 +110,9 @@ export class Postgres extends Pool {
|
|||||||
this.#options = options;
|
this.#options = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect(options: Options = {}) {
|
||||||
const wire = await wire_connect(this.#options);
|
const opts = ParsedOptions.parse({ ...this.#options, ...options });
|
||||||
|
const wire = await wire_connect(opts);
|
||||||
return wire.on("log", (l, c, s) => this.emit("log", l, c, s));
|
return wire.on("log", (l, c, s) => this.emit("log", l, c, s));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
6
query.ts
6
query.ts
@@ -345,6 +345,7 @@ export interface Row extends Iterable<unknown, void, void> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface QueryOptions {
|
export interface QueryOptions {
|
||||||
|
readonly simple: boolean;
|
||||||
readonly chunk_size: number;
|
readonly chunk_size: number;
|
||||||
readonly stdin: ReadableStream<Uint8Array> | null;
|
readonly stdin: ReadableStream<Uint8Array> | null;
|
||||||
readonly stdout: WritableStream<Uint8Array> | null;
|
readonly stdout: WritableStream<Uint8Array> | null;
|
||||||
@@ -359,6 +360,11 @@ export class Query<T = Row>
|
|||||||
this.#f = f;
|
this.#f = f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
simple(simple = true) {
|
||||||
|
const f = this.#f;
|
||||||
|
return new Query((o) => f({ simple, ...o }));
|
||||||
|
}
|
||||||
|
|
||||||
chunked(chunk_size = 1) {
|
chunked(chunk_size = 1) {
|
||||||
const f = this.#f;
|
const f = this.#f;
|
||||||
return new Query((o) => f({ chunk_size, ...o }));
|
return new Query((o) => f({ chunk_size, ...o }));
|
||||||
|
|||||||
47
test.ts
47
test.ts
@@ -2,9 +2,9 @@ import pglue, { PostgresError, SqlTypeError } from "./mod.ts";
|
|||||||
import { expect } from "jsr:@std/expect";
|
import { expect } from "jsr:@std/expect";
|
||||||
import { toText } from "jsr:@std/streams";
|
import { toText } from "jsr:@std/streams";
|
||||||
|
|
||||||
async function connect() {
|
async function connect(params?: Record<string, string>) {
|
||||||
const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, {
|
const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, {
|
||||||
runtime_params: { client_min_messages: "INFO" },
|
runtime_params: { client_min_messages: "INFO", ...params },
|
||||||
});
|
});
|
||||||
|
|
||||||
return pg.on("log", (_level, ctx, msg) => {
|
return pg.on("log", (_level, ctx, msg) => {
|
||||||
@@ -139,7 +139,7 @@ Deno.test(`sql injection`, async () => {
|
|||||||
expect(name).toBe(input);
|
expect(name).toBe(input);
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test(`pubsub`, async () => {
|
Deno.test(`listen/notify`, async () => {
|
||||||
await using pg = await connect();
|
await using pg = await connect();
|
||||||
const sent: string[] = [];
|
const sent: string[] = [];
|
||||||
|
|
||||||
@@ -152,6 +152,8 @@ Deno.test(`pubsub`, async () => {
|
|||||||
sent.push(payload);
|
sent.push(payload);
|
||||||
await ch.notify(payload);
|
await ch.notify(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expect(sent.length).toBe(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test(`transactions`, async () => {
|
Deno.test(`transactions`, async () => {
|
||||||
@@ -188,3 +190,42 @@ Deno.test(`transactions`, async () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Deno.test(`streaming`, async () => {
|
||||||
|
await using pg = await connect();
|
||||||
|
await using _tx = await pg.begin();
|
||||||
|
|
||||||
|
await pg.query`create table my_table (field text not null)`;
|
||||||
|
|
||||||
|
for (let i = 0; i < 20; i++) {
|
||||||
|
await pg.query`insert into my_table (field) values (${i})`;
|
||||||
|
}
|
||||||
|
|
||||||
|
let i = 0;
|
||||||
|
for await (const chunk of pg.query`select * from my_table`.chunked(5)) {
|
||||||
|
expect(chunk.length).toBe(5);
|
||||||
|
for (const row of chunk) expect(row.field).toBe(`${i++}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(i).toBe(20);
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.test(`simple`, async () => {
|
||||||
|
await using pg = await connect();
|
||||||
|
await using _tx = await pg.begin();
|
||||||
|
|
||||||
|
const rows = await pg.query`
|
||||||
|
create table my_table (field text not null);
|
||||||
|
insert into my_table (field) values ('one'), ('two'), ('three');
|
||||||
|
select * from my_table;
|
||||||
|
select * from my_table where field = 'two';
|
||||||
|
`.simple();
|
||||||
|
|
||||||
|
expect(rows.length).toBe(4);
|
||||||
|
|
||||||
|
const [{ field: a }, { field: b }, { field: c }, { field: d }] = rows;
|
||||||
|
expect(a).toBe("one");
|
||||||
|
expect(b).toBe("two");
|
||||||
|
expect(c).toBe("three");
|
||||||
|
expect(d).toBe("two");
|
||||||
|
});
|
||||||
|
|||||||
511
wire.ts
511
wire.ts
@@ -1,5 +1,6 @@
|
|||||||
import {
|
import {
|
||||||
type BinaryLike,
|
type BinaryLike,
|
||||||
|
buf_concat,
|
||||||
buf_concat_fast,
|
buf_concat_fast,
|
||||||
buf_eq,
|
buf_eq,
|
||||||
buf_xor,
|
buf_xor,
|
||||||
@@ -7,8 +8,9 @@ import {
|
|||||||
from_base64,
|
from_base64,
|
||||||
from_utf8,
|
from_utf8,
|
||||||
jit,
|
jit,
|
||||||
|
type Receiver,
|
||||||
semaphore,
|
semaphore,
|
||||||
semaphore_fast,
|
type Sender,
|
||||||
to_base64,
|
to_base64,
|
||||||
to_utf8,
|
to_utf8,
|
||||||
TypedEmitter,
|
TypedEmitter,
|
||||||
@@ -34,6 +36,7 @@ import {
|
|||||||
} from "./ser.ts";
|
} from "./ser.ts";
|
||||||
import {
|
import {
|
||||||
type CommandResult,
|
type CommandResult,
|
||||||
|
format,
|
||||||
is_sql,
|
is_sql,
|
||||||
Query,
|
Query,
|
||||||
type ResultStream,
|
type ResultStream,
|
||||||
@@ -88,7 +91,7 @@ export class PostgresError extends WireError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function severity_level(s: string): LogLevel {
|
function severity_log_level(s: string): LogLevel {
|
||||||
switch (s) {
|
switch (s) {
|
||||||
case "DEBUG":
|
case "DEBUG":
|
||||||
return "debug";
|
return "debug";
|
||||||
@@ -445,14 +448,15 @@ export interface WireOptions {
|
|||||||
readonly password: string;
|
readonly password: string;
|
||||||
readonly database: string | null;
|
readonly database: string | null;
|
||||||
readonly runtime_params: Record<string, string>;
|
readonly runtime_params: Record<string, string>;
|
||||||
|
readonly reconnect_delay: number;
|
||||||
readonly types: SqlTypeMap;
|
readonly types: SqlTypeMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type WireEvents = {
|
export type WireEvents = {
|
||||||
log(level: LogLevel, ctx: object, msg: string): void;
|
log(level: LogLevel, ctx: object, msg: string): void;
|
||||||
notice(notice: PostgresError): void;
|
notice(notice: PostgresError): void;
|
||||||
parameter(name: string, value: string, prev: string | null): void;
|
|
||||||
notify(channel: string, payload: string, process_id: number): void;
|
notify(channel: string, payload: string, process_id: number): void;
|
||||||
|
parameter(name: string, value: string, prev: string | null): void;
|
||||||
close(reason?: unknown): void;
|
close(reason?: unknown): void;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -475,57 +479,41 @@ export interface Channel
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function wire_connect(options: WireOptions) {
|
export async function wire_connect(options: WireOptions) {
|
||||||
const { host, port } = options;
|
const wire = new Wire(options);
|
||||||
const wire = new Wire(await socket_connect(host, port), options);
|
return await wire.connect(), wire;
|
||||||
return await wire.connected, wire;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function socket_connect(hostname: string, port: number) {
|
export class Wire<V extends WireEvents = WireEvents>
|
||||||
if (hostname.startsWith("/")) {
|
extends TypedEmitter<V>
|
||||||
const path = join(hostname, `.s.PGSQL.${port}`);
|
implements Disposable
|
||||||
return await Deno.connect({ transport: "unix", path });
|
{
|
||||||
} else {
|
|
||||||
const socket = await Deno.connect({ transport: "tcp", hostname, port });
|
|
||||||
return socket.setNoDelay(), socket.setKeepAlive(), socket;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export class Wire extends TypedEmitter<WireEvents> implements Disposable {
|
|
||||||
readonly #socket;
|
|
||||||
readonly #params;
|
readonly #params;
|
||||||
readonly #auth;
|
readonly #connect;
|
||||||
readonly #connected;
|
|
||||||
readonly #query;
|
readonly #query;
|
||||||
readonly #begin;
|
readonly #begin;
|
||||||
readonly #listen;
|
readonly #listen;
|
||||||
readonly #notify;
|
readonly #notify;
|
||||||
readonly #close;
|
readonly #close;
|
||||||
|
|
||||||
get socket() {
|
|
||||||
return this.#socket;
|
|
||||||
}
|
|
||||||
|
|
||||||
get params() {
|
get params() {
|
||||||
return this.#params;
|
return this.#params;
|
||||||
}
|
}
|
||||||
|
|
||||||
get connected() {
|
constructor(options: WireOptions) {
|
||||||
return this.#connected;
|
|
||||||
}
|
|
||||||
|
|
||||||
constructor(socket: Deno.Conn, options: WireOptions) {
|
|
||||||
super();
|
super();
|
||||||
({
|
({
|
||||||
params: this.#params,
|
params: this.#params,
|
||||||
auth: this.#auth,
|
connect: this.#connect,
|
||||||
query: this.#query,
|
query: this.#query,
|
||||||
begin: this.#begin,
|
begin: this.#begin,
|
||||||
listen: this.#listen,
|
listen: this.#listen,
|
||||||
notify: this.#notify,
|
notify: this.#notify,
|
||||||
close: this.#close,
|
close: this.#close,
|
||||||
} = wire_impl(this, socket, options));
|
} = wire_impl(this, options));
|
||||||
this.#socket = socket;
|
}
|
||||||
(this.#connected = this.#auth()).catch(close);
|
|
||||||
|
connect() {
|
||||||
|
return this.#connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
query<T = Row>(sql: SqlFragment): Query<T>;
|
query<T = Row>(sql: SqlFragment): Query<T>;
|
||||||
@@ -579,107 +567,188 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function socket_connect(hostname: string, port: number) {
|
||||||
|
if (hostname.startsWith("/")) {
|
||||||
|
const path = join(hostname, `.s.PGSQL.${port}`);
|
||||||
|
return await Deno.connect({ transport: "unix", path });
|
||||||
|
} else {
|
||||||
|
const socket = await Deno.connect({ transport: "tcp", hostname, port });
|
||||||
|
return socket.setNoDelay(), socket.setKeepAlive(), socket;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function wire_impl(
|
function wire_impl(
|
||||||
wire: Wire,
|
wire: Wire,
|
||||||
socket: Deno.Conn,
|
{
|
||||||
{ user, database, password, runtime_params, types }: WireOptions
|
host,
|
||||||
|
port,
|
||||||
|
user,
|
||||||
|
database,
|
||||||
|
password,
|
||||||
|
runtime_params,
|
||||||
|
reconnect_delay,
|
||||||
|
types,
|
||||||
|
}: WireOptions
|
||||||
) {
|
) {
|
||||||
|
// current runtime parameters as reported by postgres
|
||||||
const params: Parameters = Object.create(null);
|
const params: Parameters = Object.create(null);
|
||||||
|
|
||||||
function log(level: LogLevel, ctx: object, msg: string) {
|
function log(level: LogLevel, ctx: object, msg: string) {
|
||||||
wire.emit("log", level, ctx, msg);
|
wire.emit("log", level, ctx, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wire supports re-connection; socket and read/write channels are null when closed
|
||||||
|
let connected = false;
|
||||||
|
let should_reconnect = false;
|
||||||
|
let socket: Deno.Conn | null = null;
|
||||||
|
let read_pop: Receiver<Uint8Array> | null = null;
|
||||||
|
let write_push: Sender<Uint8Array> | null = null;
|
||||||
|
|
||||||
async function read<T>(type: Encoder<T>) {
|
async function read<T>(type: Encoder<T>) {
|
||||||
const msg = await read_recv();
|
const msg = read_pop !== null ? await read_pop() : null;
|
||||||
if (msg === null) throw new WireError(`connection closed`);
|
if (msg !== null) return ser_decode(type, msg_check_err(msg));
|
||||||
else return ser_decode(type, msg_check_err(msg));
|
else throw new WireError(`connection closed`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function read_raw() {
|
async function read_msg() {
|
||||||
const msg = await read_recv();
|
const msg = read_pop !== null ? await read_pop() : null;
|
||||||
if (msg === null) throw new WireError(`connection closed`);
|
if (msg !== null) return msg;
|
||||||
else return msg;
|
else throw new WireError(`connection closed`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function* read_socket() {
|
async function read_socket(socket: Deno.Conn, push: Sender<Uint8Array>) {
|
||||||
const buf = new Uint8Array(64 * 1024);
|
let err;
|
||||||
for (let n; (n = await socket.read(buf)) !== null; )
|
|
||||||
yield buf.subarray(0, n);
|
|
||||||
}
|
|
||||||
|
|
||||||
const read_recv = channel.receiver<Uint8Array>(async function read(send) {
|
|
||||||
let err: unknown;
|
|
||||||
try {
|
try {
|
||||||
let buf = new Uint8Array();
|
const header_size = 5;
|
||||||
for await (const chunk of read_socket()) {
|
const read_buf = new Uint8Array(64 * 1024); // shared buffer for all socket reads
|
||||||
buf = buf_concat_fast(buf, chunk);
|
let buf = new Uint8Array(); // concatenated messages read so far
|
||||||
|
|
||||||
for (let n; (n = ser_decode(Header, buf).length + 1) <= buf.length; ) {
|
for (let read; (read = await socket.read(read_buf)) !== null; ) {
|
||||||
const msg = buf.subarray(0, n);
|
buf = buf_concat_fast(buf, read_buf.subarray(0, read)); // push read bytes to buf
|
||||||
buf = buf.subarray(n);
|
while (buf.length >= header_size) {
|
||||||
|
const size = ser_decode(Header, buf).length + 1;
|
||||||
switch (msg_type(msg)) {
|
if (buf.length < size) break;
|
||||||
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-ASYNC
|
const msg = buf.subarray(0, size); // shift one message from buf
|
||||||
case NoticeResponse.type: {
|
buf = buf.subarray(size);
|
||||||
const { fields } = ser_decode(NoticeResponse, msg);
|
if (!handle_msg(msg)) push(msg);
|
||||||
const notice = new PostgresError(fields);
|
|
||||||
log(severity_level(notice.severity), notice, notice.message);
|
|
||||||
wire.emit("notice", notice);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
case ParameterStatus.type: {
|
|
||||||
const { name, value } = ser_decode(ParameterStatus, msg);
|
|
||||||
const prev = params[name] ?? null;
|
|
||||||
Object.defineProperty(params, name, {
|
|
||||||
configurable: true,
|
|
||||||
enumerable: true,
|
|
||||||
value,
|
|
||||||
});
|
|
||||||
wire.emit("parameter", name, value, prev);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
case NotificationResponse.type: {
|
|
||||||
const { channel, payload, process_id } = ser_decode(
|
|
||||||
NotificationResponse,
|
|
||||||
msg
|
|
||||||
);
|
|
||||||
wire.emit("notify", channel, payload, process_id);
|
|
||||||
channels.get(channel)?.emit("notify", payload, process_id);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
send(msg);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// there should be nothing left in buf if we gracefully exited
|
||||||
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
|
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw ((err = e), e);
|
throw (err = e);
|
||||||
} finally {
|
} finally {
|
||||||
wire.emit("close", err);
|
onclose(err);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
function write<T>(type: Encoder<T>, value: T) {
|
|
||||||
return write_raw(ser_encode(type, value));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function write_raw(buf: Uint8Array) {
|
function handle_msg(msg: Uint8Array) {
|
||||||
for (let i = 0, n = buf.length; i < n; )
|
switch (msg_type(msg)) {
|
||||||
i += await socket.write(buf.subarray(i));
|
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-ASYNC
|
||||||
|
case NoticeResponse.type: {
|
||||||
|
const { fields } = ser_decode(NoticeResponse, msg);
|
||||||
|
const notice = new PostgresError(fields);
|
||||||
|
log(severity_log_level(notice.severity), notice, notice.message);
|
||||||
|
wire.emit("notice", notice);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
case NotificationResponse.type: {
|
||||||
|
const { channel, payload, process_id } = ser_decode(
|
||||||
|
NotificationResponse,
|
||||||
|
msg
|
||||||
|
);
|
||||||
|
wire.emit("notify", channel, payload, process_id);
|
||||||
|
channels.get(channel)?.emit("notify", payload, process_id);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
case ParameterStatus.type: {
|
||||||
|
const { name, value } = ser_decode(ParameterStatus, msg);
|
||||||
|
const prev = params[name] ?? null;
|
||||||
|
Object.defineProperty(params, name, {
|
||||||
|
configurable: true,
|
||||||
|
enumerable: true,
|
||||||
|
value,
|
||||||
|
});
|
||||||
|
wire.emit("parameter", name, value, prev);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
function write<T>(type: Encoder<T>, value: T) {
|
||||||
|
write_msg(ser_encode(type, value));
|
||||||
|
}
|
||||||
|
|
||||||
|
function write_msg(buf: Uint8Array) {
|
||||||
|
if (write_push !== null) write_push(buf);
|
||||||
|
else throw new WireError(`connection closed`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function write_socket(socket: Deno.Conn, pop: Receiver<Uint8Array>) {
|
||||||
|
let err;
|
||||||
|
try {
|
||||||
|
for (let buf; (buf = await pop()) !== null; ) {
|
||||||
|
const bufs = [buf]; // proactively dequeue more queued msgs synchronously, if any
|
||||||
|
for (let i = 1, buf; (buf = pop.try()) !== null; ) bufs[i++] = buf;
|
||||||
|
if (bufs.length !== 1) buf = buf_concat(bufs); // write queued msgs concatenated, reduce write syscalls
|
||||||
|
for (let i = 0, n = buf.length; i < n; )
|
||||||
|
i += await socket.write(buf.subarray(i));
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
throw (err = e);
|
||||||
|
} finally {
|
||||||
|
onclose(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function connect() {
|
||||||
|
using _rlock = await rlock();
|
||||||
|
using _wlock = await wlock();
|
||||||
|
if (connected) return;
|
||||||
|
try {
|
||||||
|
const s = (socket = await socket_connect(host, port));
|
||||||
|
read_pop = channel.receiver((push) => read_socket(s, push));
|
||||||
|
write_push = channel.sender((pop) => write_socket(s, pop));
|
||||||
|
await handle_auth(); // run auth with rw lock
|
||||||
|
(connected = true), (should_reconnect = reconnect_delay !== 0);
|
||||||
|
} catch (e) {
|
||||||
|
throw (close(e), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function reconnect() {
|
||||||
|
connect().catch((err) => {
|
||||||
|
log("warn", err as Error, `reconnect failed`);
|
||||||
|
setTimeout(reconnect, reconnect_delay);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function close(reason?: unknown) {
|
function close(reason?: unknown) {
|
||||||
socket.close(), read_recv.close(reason);
|
(should_reconnect = false), onclose(reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
function onclose(reason?: unknown) {
|
||||||
|
if (!connected) return;
|
||||||
|
else connected = false;
|
||||||
|
socket?.close(), (socket = null);
|
||||||
|
read_pop?.close(reason), (read_pop = null);
|
||||||
|
write_push?.close(reason), (write_push = null);
|
||||||
|
for (const name of Object.keys(params))
|
||||||
|
delete (params as Record<string, string>)[name];
|
||||||
|
st_cache.clear(), (st_ids = 0);
|
||||||
|
(tx_status = "I"), (tx_stack.length = 0);
|
||||||
|
should_reconnect &&= (setTimeout(reconnect, reconnect_delay), false);
|
||||||
|
wire.emit("close", reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING
|
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING
|
||||||
const rlock = semaphore_fast();
|
const rlock = semaphore();
|
||||||
const wlock = semaphore_fast();
|
const wlock = semaphore();
|
||||||
|
|
||||||
function pipeline<T>(
|
function pipeline<T>(
|
||||||
w: () => void | PromiseLike<void>,
|
w: () => void | PromiseLike<void>,
|
||||||
@@ -691,39 +760,38 @@ function wire_impl(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function pipeline_read<T>(r: () => T | PromiseLike<T>) {
|
async function pipeline_read<T>(r: () => T | PromiseLike<T>) {
|
||||||
return rlock(async function pipeline_read() {
|
using _lock = await rlock();
|
||||||
|
try {
|
||||||
|
return await r();
|
||||||
|
} finally {
|
||||||
try {
|
try {
|
||||||
return await r();
|
let msg;
|
||||||
} finally {
|
while (msg_type((msg = await read_msg())) !== ReadyForQuery.type);
|
||||||
try {
|
({ tx_status } = ser_decode(ReadyForQuery, msg));
|
||||||
let msg;
|
} catch {
|
||||||
while (msg_type((msg = await read_raw())) !== ReadyForQuery.type);
|
// ignored
|
||||||
({ tx_status } = ser_decode(ReadyForQuery, msg));
|
|
||||||
} catch {
|
|
||||||
// ignored
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function pipeline_write<T>(w: () => T | PromiseLike<T>) {
|
async function pipeline_write<T>(w: () => T | PromiseLike<T>) {
|
||||||
return wlock(async function pipeline_write() {
|
using _lock = await wlock();
|
||||||
|
try {
|
||||||
|
return await w();
|
||||||
|
} finally {
|
||||||
try {
|
try {
|
||||||
return await w();
|
write(Sync, {});
|
||||||
} finally {
|
} catch {
|
||||||
try {
|
// ignored
|
||||||
await write(Sync, {});
|
|
||||||
} catch {
|
|
||||||
// ignored
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-START-UP
|
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-START-UP
|
||||||
async function auth() {
|
async function handle_auth() {
|
||||||
await write(StartupMessage, {
|
// always run within rw lock (see connect())
|
||||||
|
write(StartupMessage, {
|
||||||
version: 196608,
|
version: 196608,
|
||||||
params: {
|
params: {
|
||||||
application_name: "pglue",
|
application_name: "pglue",
|
||||||
@@ -738,7 +806,7 @@ function wire_impl(
|
|||||||
});
|
});
|
||||||
|
|
||||||
auth: for (;;) {
|
auth: for (;;) {
|
||||||
const msg = msg_check_err(await read_raw());
|
const msg = msg_check_err(await read_msg());
|
||||||
switch (msg_type(msg)) {
|
switch (msg_type(msg)) {
|
||||||
case NegotiateProtocolVersion.type: {
|
case NegotiateProtocolVersion.type: {
|
||||||
const { bad_options } = ser_decode(NegotiateProtocolVersion, msg);
|
const { bad_options } = ser_decode(NegotiateProtocolVersion, msg);
|
||||||
@@ -756,7 +824,7 @@ function wire_impl(
|
|||||||
throw new WireError(`kerberos authentication is deprecated`);
|
throw new WireError(`kerberos authentication is deprecated`);
|
||||||
|
|
||||||
case 3: // AuthenticationCleartextPassword
|
case 3: // AuthenticationCleartextPassword
|
||||||
await write(PasswordMessage, { password });
|
write(PasswordMessage, { password });
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
case 5: // AuthenticationMD5Password
|
case 5: // AuthenticationMD5Password
|
||||||
@@ -772,7 +840,7 @@ function wire_impl(
|
|||||||
|
|
||||||
// AuthenticationSASL
|
// AuthenticationSASL
|
||||||
case 10:
|
case 10:
|
||||||
await auth_sasl();
|
await handle_auth_sasl();
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@@ -780,8 +848,9 @@ function wire_impl(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait for ready
|
||||||
ready: for (;;) {
|
ready: for (;;) {
|
||||||
const msg = msg_check_err(await read_raw());
|
const msg = msg_check_err(await read_msg());
|
||||||
switch (msg_type(msg)) {
|
switch (msg_type(msg)) {
|
||||||
case BackendKeyData.type:
|
case BackendKeyData.type:
|
||||||
continue; // ignored
|
continue; // ignored
|
||||||
@@ -791,11 +860,18 @@ function wire_impl(
|
|||||||
break ready;
|
break ready;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// re-listen previously registered channels
|
||||||
|
await Promise.all(
|
||||||
|
channels
|
||||||
|
.keys()
|
||||||
|
.map((name) => query(sql`listen ${sql.ident(name)}`).execute())
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// https://www.postgresql.org/docs/current/sasl-authentication.html#SASL-SCRAM-SHA-256
|
// https://www.postgresql.org/docs/current/sasl-authentication.html#SASL-SCRAM-SHA-256
|
||||||
// https://datatracker.ietf.org/doc/html/rfc5802
|
// https://datatracker.ietf.org/doc/html/rfc5802
|
||||||
async function auth_sasl() {
|
async function handle_auth_sasl() {
|
||||||
const bits = 256;
|
const bits = 256;
|
||||||
const hash = `SHA-${bits}`;
|
const hash = `SHA-${bits}`;
|
||||||
const mechanism = `SCRAM-${hash}`;
|
const mechanism = `SCRAM-${hash}`;
|
||||||
@@ -852,7 +928,7 @@ function wire_impl(
|
|||||||
)}`;
|
)}`;
|
||||||
const client_first_message_bare = `${username},${initial_nonce}`;
|
const client_first_message_bare = `${username},${initial_nonce}`;
|
||||||
const client_first_message = `${gs2_header}${client_first_message_bare}`;
|
const client_first_message = `${gs2_header}${client_first_message_bare}`;
|
||||||
await write(SASLInitialResponse, { mechanism, data: client_first_message });
|
write(SASLInitialResponse, { mechanism, data: client_first_message });
|
||||||
|
|
||||||
const server_first_message_str = from_utf8(
|
const server_first_message_str = from_utf8(
|
||||||
(await read(AuthenticationSASLContinue)).data
|
(await read(AuthenticationSASLContinue)).data
|
||||||
@@ -871,7 +947,7 @@ function wire_impl(
|
|||||||
const client_proof = buf_xor(client_key, client_signature);
|
const client_proof = buf_xor(client_key, client_signature);
|
||||||
const proof = `p=${to_base64(client_proof)}`;
|
const proof = `p=${to_base64(client_proof)}`;
|
||||||
const client_final_message = `${client_final_message_without_proof},${proof}`;
|
const client_final_message = `${client_final_message_without_proof},${proof}`;
|
||||||
await write(SASLResponse, { data: client_final_message });
|
write(SASLResponse, { data: client_final_message });
|
||||||
|
|
||||||
const server_key = await hmac(salted_password, "Server Key");
|
const server_key = await hmac(salted_password, "Server Key");
|
||||||
const server_signature = await hmac(server_key, auth_message);
|
const server_signature = await hmac(server_key, auth_message);
|
||||||
@@ -891,47 +967,44 @@ function wire_impl(
|
|||||||
readonly name = `__st${st_ids++}`;
|
readonly name = `__st${st_ids++}`;
|
||||||
constructor(readonly query: string) {}
|
constructor(readonly query: string) {}
|
||||||
|
|
||||||
parse_task: Promise<{
|
#parse_task: Promise<{
|
||||||
ser_params: ParameterSerializer;
|
ser_params: ParameterSerializer;
|
||||||
Row: RowConstructor;
|
Row: RowConstructor;
|
||||||
}> | null = null;
|
}> | null = null;
|
||||||
|
|
||||||
parse() {
|
parse() {
|
||||||
return (this.parse_task ??= this.#parse());
|
return (this.#parse_task ??= this.#parse());
|
||||||
}
|
}
|
||||||
|
|
||||||
async #parse() {
|
async #parse() {
|
||||||
try {
|
try {
|
||||||
const { name, query } = this;
|
const { name, query } = this;
|
||||||
return await pipeline(
|
return await pipeline(
|
||||||
async () => {
|
() => {
|
||||||
await write(Parse, { statement: name, query, param_types: [] });
|
write(Parse, { statement: name, query, param_types: [] });
|
||||||
await write(Describe, { which: "S", name });
|
write(Describe, { which: "S", name });
|
||||||
},
|
},
|
||||||
async () => {
|
async () => {
|
||||||
await read(ParseComplete);
|
await read(ParseComplete);
|
||||||
const param_desc = await read(ParameterDescription);
|
const ser_params = make_param_ser(await read(ParameterDescription));
|
||||||
|
|
||||||
const msg = msg_check_err(await read_raw());
|
const msg = msg_check_err(await read_msg());
|
||||||
const row_desc =
|
const Row =
|
||||||
msg_type(msg) === NoData.type
|
msg_type(msg) === NoData.type
|
||||||
? { columns: [] }
|
? EmptyRow
|
||||||
: ser_decode(RowDescription, msg);
|
: make_row_ctor(ser_decode(RowDescription, msg));
|
||||||
|
|
||||||
return {
|
return { ser_params, Row };
|
||||||
ser_params: make_param_ser(param_desc),
|
|
||||||
Row: make_row_ctor(row_desc),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw ((this.parse_task = null), e);
|
throw ((this.#parse_task = null), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
portals = 0;
|
#portals = 0;
|
||||||
portal() {
|
portal() {
|
||||||
return `${this.name}_${this.portals++}`;
|
return `${this.name}_${this.#portals++}`;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -940,6 +1013,7 @@ function wire_impl(
|
|||||||
(params: unknown[]): (string | null)[];
|
(params: unknown[]): (string | null)[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// makes function to serialize query parameters
|
||||||
function make_param_ser({ param_types }: ParameterDescription) {
|
function make_param_ser({ param_types }: ParameterDescription) {
|
||||||
return jit.compiled<ParameterSerializer>`function ser_params(xs) {
|
return jit.compiled<ParameterSerializer>`function ser_params(xs) {
|
||||||
return [
|
return [
|
||||||
@@ -956,6 +1030,8 @@ function wire_impl(
|
|||||||
new (columns: (BinaryLike | null)[]): Row;
|
new (columns: (BinaryLike | null)[]): Row;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// makes function to create Row objects
|
||||||
|
const EmptyRow = make_row_ctor({ columns: [] });
|
||||||
function make_row_ctor({ columns }: RowDescription) {
|
function make_row_ctor({ columns }: RowDescription) {
|
||||||
const Row = jit.compiled<RowConstructor>`function Row(xs) {
|
const Row = jit.compiled<RowConstructor>`function Row(xs) {
|
||||||
${jit.map(" ", columns, ({ name, type_oid }, i) => {
|
${jit.map(" ", columns, ({ name, type_oid }, i) => {
|
||||||
@@ -993,7 +1069,7 @@ function wire_impl(
|
|||||||
stdout: WritableStream<Uint8Array> | null
|
stdout: WritableStream<Uint8Array> | null
|
||||||
) {
|
) {
|
||||||
for (let rows = [], i = 0; ; ) {
|
for (let rows = [], i = 0; ; ) {
|
||||||
const msg = msg_check_err(await read_raw());
|
const msg = msg_check_err(await read_msg());
|
||||||
switch (msg_type(msg)) {
|
switch (msg_type(msg)) {
|
||||||
default:
|
default:
|
||||||
case DataRow.type:
|
case DataRow.type:
|
||||||
@@ -1011,11 +1087,15 @@ function wire_impl(
|
|||||||
case EmptyQueryResponse.type:
|
case EmptyQueryResponse.type:
|
||||||
return { done: true as const, rows, tag: "" };
|
return { done: true as const, rows, tag: "" };
|
||||||
|
|
||||||
|
case RowDescription.type:
|
||||||
|
Row = make_row_ctor(ser_decode(RowDescription, msg));
|
||||||
|
continue;
|
||||||
|
|
||||||
case CopyInResponse.type:
|
case CopyInResponse.type:
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
case CopyOutResponse.type:
|
case CopyOutResponse.type:
|
||||||
await read_copy_out(stdout);
|
await read_copy_out(stdout), (stdout = null);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1025,7 +1105,7 @@ function wire_impl(
|
|||||||
if (stream !== null) {
|
if (stream !== null) {
|
||||||
const writer = stream.getWriter();
|
const writer = stream.getWriter();
|
||||||
try {
|
try {
|
||||||
for (let msg; msg_type((msg = await read_raw())) !== CopyDone.type; ) {
|
for (let msg; msg_type((msg = await read_msg())) !== CopyDone.type; ) {
|
||||||
const { data } = ser_decode(CopyData, msg_check_err(msg));
|
const { data } = ser_decode(CopyData, msg_check_err(msg));
|
||||||
await writer.write(to_utf8(data));
|
await writer.write(to_utf8(data));
|
||||||
}
|
}
|
||||||
@@ -1037,7 +1117,7 @@ function wire_impl(
|
|||||||
writer.releaseLock();
|
writer.releaseLock();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
while (msg_type(msg_check_err(await read_raw())) !== CopyDone.type);
|
while (msg_type(msg_check_err(await read_msg())) !== CopyDone.type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1046,31 +1126,71 @@ function wire_impl(
|
|||||||
const reader = stream.getReader();
|
const reader = stream.getReader();
|
||||||
try {
|
try {
|
||||||
for (let next; !(next = await reader.read()).done; )
|
for (let next; !(next = await reader.read()).done; )
|
||||||
await write(CopyData, { data: next.value });
|
write(CopyData, { data: next.value });
|
||||||
await write(CopyDone, {});
|
write(CopyDone, {});
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
await write(CopyFail, { cause: String(e) });
|
write(CopyFail, { cause: String(e) });
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
reader.releaseLock();
|
reader.releaseLock();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
await write(CopyDone, {});
|
write(CopyDone, {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function* execute_simple(
|
||||||
|
query: string,
|
||||||
|
stdin: ReadableStream<Uint8Array> | null,
|
||||||
|
stdout: WritableStream<Uint8Array> | null
|
||||||
|
): ResultStream<Row> {
|
||||||
|
yield* await pipeline(
|
||||||
|
() => {
|
||||||
|
log("debug", { query }, `executing simple query`);
|
||||||
|
write(QueryMessage, { query });
|
||||||
|
write_copy_in(stdin);
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
for (let chunks = [], err; ; ) {
|
||||||
|
const msg = await read_msg();
|
||||||
|
switch (msg_type(msg)) {
|
||||||
|
default:
|
||||||
|
case ReadyForQuery.type:
|
||||||
|
if (err) throw err;
|
||||||
|
else return chunks;
|
||||||
|
|
||||||
|
case RowDescription.type: {
|
||||||
|
const Row = make_row_ctor(ser_decode(RowDescription, msg));
|
||||||
|
const { rows } = await read_rows(Row, stdout);
|
||||||
|
chunks.push(rows);
|
||||||
|
stdout = null;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
case EmptyQueryResponse.type:
|
||||||
|
case CommandComplete.type:
|
||||||
|
continue;
|
||||||
|
|
||||||
|
case ErrorResponse.type: {
|
||||||
|
const { fields } = ser_decode(ErrorResponse, msg);
|
||||||
|
err = new PostgresError(fields);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return { tag: "" };
|
||||||
|
}
|
||||||
|
|
||||||
async function* execute_fast(
|
async function* execute_fast(
|
||||||
st: Statement,
|
st: Statement,
|
||||||
params: unknown[],
|
params: unknown[],
|
||||||
stdin: ReadableStream<Uint8Array> | null,
|
stdin: ReadableStream<Uint8Array> | null,
|
||||||
stdout: WritableStream<Uint8Array> | null
|
stdout: WritableStream<Uint8Array> | null
|
||||||
): ResultStream<Row> {
|
): ResultStream<Row> {
|
||||||
log(
|
const { query, name: statement } = st;
|
||||||
"debug",
|
|
||||||
{ query: st.query, statement: st.name, params },
|
|
||||||
`executing query`
|
|
||||||
);
|
|
||||||
|
|
||||||
const { ser_params, Row } = await st.parse();
|
const { ser_params, Row } = await st.parse();
|
||||||
const param_values = ser_params(params);
|
const param_values = ser_params(params);
|
||||||
const portal = st.portal();
|
const portal = st.portal();
|
||||||
@@ -1078,16 +1198,17 @@ function wire_impl(
|
|||||||
try {
|
try {
|
||||||
const { rows, tag } = await pipeline(
|
const { rows, tag } = await pipeline(
|
||||||
async () => {
|
async () => {
|
||||||
await write(Bind, {
|
log("debug", { query, statement, params }, `executing query`);
|
||||||
|
write(Bind, {
|
||||||
portal,
|
portal,
|
||||||
statement: st.name,
|
statement: st.name,
|
||||||
param_formats: [],
|
param_formats: [],
|
||||||
param_values,
|
param_values,
|
||||||
column_formats: [],
|
column_formats: [],
|
||||||
});
|
});
|
||||||
await write(Execute, { portal, row_limit: 0 });
|
write(Execute, { portal, row_limit: 0 });
|
||||||
await write_copy_in(stdin);
|
await write_copy_in(stdin);
|
||||||
await write(Close, { which: "P" as const, name: portal });
|
write(Close, { which: "P", name: portal });
|
||||||
},
|
},
|
||||||
async () => {
|
async () => {
|
||||||
await read(BindComplete);
|
await read(BindComplete);
|
||||||
@@ -1100,7 +1221,7 @@ function wire_impl(
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
try {
|
try {
|
||||||
await pipeline(
|
await pipeline(
|
||||||
() => write(Close, { which: "P" as const, name: portal }),
|
() => write(Close, { which: "P", name: portal }),
|
||||||
() => read(CloseComplete)
|
() => read(CloseComplete)
|
||||||
);
|
);
|
||||||
} catch {
|
} catch {
|
||||||
@@ -1118,28 +1239,24 @@ function wire_impl(
|
|||||||
stdin: ReadableStream<Uint8Array> | null,
|
stdin: ReadableStream<Uint8Array> | null,
|
||||||
stdout: WritableStream<Uint8Array> | null
|
stdout: WritableStream<Uint8Array> | null
|
||||||
): ResultStream<Row> {
|
): ResultStream<Row> {
|
||||||
log(
|
const { query, name: statement } = st;
|
||||||
"debug",
|
|
||||||
{ query: st.query, statement: st.name, params },
|
|
||||||
`executing chunked query`
|
|
||||||
);
|
|
||||||
|
|
||||||
const { ser_params, Row } = await st.parse();
|
const { ser_params, Row } = await st.parse();
|
||||||
const param_values = ser_params(params);
|
const param_values = ser_params(params);
|
||||||
const portal = st.portal();
|
const portal = st.portal();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let { done, rows, tag } = await pipeline(
|
let { done, rows, tag } = await pipeline(
|
||||||
async () => {
|
() => {
|
||||||
await write(Bind, {
|
log("debug", { query, statement, params }, `executing chunked query`);
|
||||||
|
write(Bind, {
|
||||||
portal,
|
portal,
|
||||||
statement: st.name,
|
statement: st.name,
|
||||||
param_formats: [],
|
param_formats: [],
|
||||||
param_values,
|
param_values,
|
||||||
column_formats: [],
|
column_formats: [],
|
||||||
});
|
});
|
||||||
await write(Execute, { portal, row_limit: chunk_size });
|
write(Execute, { portal, row_limit: chunk_size });
|
||||||
await write_copy_in(stdin);
|
return write_copy_in(stdin);
|
||||||
},
|
},
|
||||||
async () => {
|
async () => {
|
||||||
await read(BindComplete);
|
await read(BindComplete);
|
||||||
@@ -1161,21 +1278,26 @@ function wire_impl(
|
|||||||
return { tag };
|
return { tag };
|
||||||
} finally {
|
} finally {
|
||||||
await pipeline(
|
await pipeline(
|
||||||
() => write(Close, { which: "P" as const, name: portal }),
|
() => write(Close, { which: "P", name: portal }),
|
||||||
() => read(CloseComplete)
|
() => read(CloseComplete)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function query(s: SqlFragment) {
|
function query(sql: SqlFragment) {
|
||||||
const { query, params } = sql.format(s);
|
return new Query(
|
||||||
let st = st_cache.get(query);
|
({ simple = false, chunk_size = 0, stdin = null, stdout = null }) => {
|
||||||
if (!st) st_cache.set(query, (st = new Statement(query)));
|
const { query, params } = format(sql);
|
||||||
|
if (simple) {
|
||||||
|
if (!params.length) return execute_simple(query, stdin, stdout);
|
||||||
|
else throw new WireError(`simple query cannot be parameterised`);
|
||||||
|
}
|
||||||
|
|
||||||
return new Query(({ chunk_size = 0, stdin = null, stdout = null }) =>
|
let st = st_cache.get(query);
|
||||||
chunk_size !== 0
|
if (!st) st_cache.set(query, (st = new Statement(query)));
|
||||||
? execute_chunked(st, params, chunk_size, stdin, stdout)
|
if (!chunk_size) return execute_fast(st, params, stdin, stdout);
|
||||||
: execute_fast(st, params, stdin, stdout)
|
else return execute_chunked(st, params, chunk_size, stdin, stdout);
|
||||||
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1284,7 +1406,7 @@ function wire_impl(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return { params, auth, query, begin, listen, notify, close };
|
return { params, connect, query, begin, listen, notify, close };
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface PoolOptions extends WireOptions {
|
export interface PoolOptions extends WireOptions {
|
||||||
@@ -1296,7 +1418,7 @@ export type PoolEvents = {
|
|||||||
log(level: LogLevel, ctx: object, msg: string): void;
|
log(level: LogLevel, ctx: object, msg: string): void;
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface PoolWire extends Wire {
|
export interface PoolWire<V extends WireEvents = WireEvents> extends Wire<V> {
|
||||||
readonly connection_id: number;
|
readonly connection_id: number;
|
||||||
readonly borrowed: boolean;
|
readonly borrowed: boolean;
|
||||||
release(): void;
|
release(): void;
|
||||||
@@ -1306,8 +1428,8 @@ export interface PoolTransaction extends Transaction {
|
|||||||
readonly wire: PoolWire;
|
readonly wire: PoolWire;
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Pool
|
export class Pool<V extends PoolEvents = PoolEvents>
|
||||||
extends TypedEmitter<PoolEvents>
|
extends TypedEmitter<V>
|
||||||
implements PromiseLike<PoolWire>, Disposable
|
implements PromiseLike<PoolWire>, Disposable
|
||||||
{
|
{
|
||||||
readonly #acquire;
|
readonly #acquire;
|
||||||
@@ -1443,9 +1565,8 @@ function pool_impl(
|
|||||||
};
|
};
|
||||||
|
|
||||||
async function connect() {
|
async function connect() {
|
||||||
const { host, port } = options;
|
const wire = new PoolWire(options);
|
||||||
const wire = new PoolWire(await socket_connect(host, port), options);
|
await wire.connect(), all.add(wire);
|
||||||
await wire.connected, all.add(wire);
|
|
||||||
const { connection_id } = wire;
|
const { connection_id } = wire;
|
||||||
return wire
|
return wire
|
||||||
.on("log", (l, c, s) => pool.emit("log", l, { ...c, connection_id }, s))
|
.on("log", (l, c, s) => pool.emit("log", l, { ...c, connection_id }, s))
|
||||||
|
|||||||
Reference in New Issue
Block a user