Compare commits
16 Commits
654c564aff
...
v0.1.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
3793e14f50
|
|||
|
b194397645
|
|||
|
858b7a95f3
|
|||
|
a88da00dec
|
|||
|
cefe14b9dc
|
|||
|
4e68e34fd0
|
|||
|
826190ecc9
|
|||
|
72749e5841
|
|||
|
b9829bc70d
|
|||
|
9eecf29bc5
|
|||
|
8964cb342e
|
|||
|
eeed8b2f66
|
|||
|
b72c548c33
|
|||
|
60899d1a41
|
|||
|
bdebb22a0e
|
|||
|
5dadd7c5a2
|
103
README.md
103
README.md
@@ -1,16 +1,37 @@
|
||||
# pglue
|
||||
|
||||
## Performance
|
||||
The glue for TypeScript to PostgreSQL.
|
||||
|
||||
pglue implements automatic query pipelining which makes it especially performant with many queries concurrently executed on a single connection.
|
||||
## Overview
|
||||
|
||||
- 🌟 [High performance](#benchmarks), fully asynchronous, written in modern TypeScript
|
||||
- 🐢 First class Deno support
|
||||
- 💬 Automatic query parameterisation
|
||||
- 🌧️ Automatic query pipelining
|
||||
- 📣 Listen/notify support
|
||||
- 📤 Connection pool support
|
||||
|
||||
## Installation
|
||||
|
||||
```ts
|
||||
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/v0.1.3/mod.ts";
|
||||
// ...or from github:
|
||||
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/v0.1.3/mod.ts";
|
||||
```
|
||||
|
||||
## Documentation
|
||||
|
||||
TODO: Write the documentation in more detail here.
|
||||
|
||||
## Benchmarks
|
||||
|
||||
Performance is generally on par with [postgres.js][1] and up to **4x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
|
||||
Performance is generally on par with [postgres-js][1] and up to **5x faster** than [deno-postgres][2]. Keep in mind that database driver benchmarks are largely dependent on the database performance itself and does not necessarily represent accurate real-world performance.
|
||||
|
||||
Tested on a 4 core 2800 MHz x86_64-pc-linux-gnu QEMU VM with Deno 2.1.4 and local PostgreSQL 17.1 installation connected via TCP on localhost:
|
||||
Tested on a 4 core, 2800 MHz, x86_64-pc-linux-gnu, QEMU VM, with Deno 2.1.4 and PostgreSQL 17.1 on localhost:
|
||||
|
||||
```
|
||||
Query `select * from pg_type`:
|
||||
|
||||
```log
|
||||
CPU | Common KVM Processor v2.0
|
||||
Runtime | Deno 2.1.4 (x86_64-unknown-linux-gnu)
|
||||
|
||||
@@ -18,74 +39,78 @@ benchmark time/iter (avg) iter/s (min … max) p75
|
||||
--------------- ----------------------------- --------------------- --------------------------
|
||||
|
||||
group select n=1
|
||||
pglue 9.9 ms 101.1 ( 7.9 ms … 17.8 ms) 10.2 ms 17.8 ms 17.8 ms
|
||||
postgres.js 8.8 ms 114.2 ( 7.0 ms … 9.5 ms) 9.1 ms 9.5 ms 9.5 ms
|
||||
deno-postgres 37.4 ms 26.7 ( 25.3 ms … 42.8 ms) 39.2 ms 42.8 ms 42.8 ms
|
||||
pglue 8.3 ms 120.4 ( 7.2 ms … 14.4 ms) 8.5 ms 14.4 ms 14.4 ms
|
||||
postgres-js 10.8 ms 92.3 ( 8.1 ms … 26.5 ms) 10.7 ms 26.5 ms 26.5 ms
|
||||
deno-postgres 37.1 ms 26.9 ( 33.4 ms … 41.3 ms) 38.5 ms 41.3 ms 41.3 ms
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.13x slower than postgres.js
|
||||
3.78x faster than deno-postgres
|
||||
1.30x faster than postgres-js
|
||||
4.47x faster than deno-postgres
|
||||
|
||||
group select n=5
|
||||
pglue 48.2 ms 20.8 ( 41.9 ms … 68.5 ms) 50.3 ms 68.5 ms 68.5 ms
|
||||
postgres.js 43.6 ms 22.9 ( 38.1 ms … 57.3 ms) 48.6 ms 57.3 ms 57.3 ms
|
||||
deno-postgres 186.5 ms 5.4 (138.4 ms … 213.2 ms) 193.6 ms 213.2 ms 213.2 ms
|
||||
pglue 39.9 ms 25.1 ( 37.2 ms … 49.6 ms) 40.8 ms 49.6 ms 49.6 ms
|
||||
postgres-js 42.4 ms 23.6 ( 36.5 ms … 61.8 ms) 44.2 ms 61.8 ms 61.8 ms
|
||||
deno-postgres 182.5 ms 5.5 (131.9 ms … 211.8 ms) 193.4 ms 211.8 ms 211.8 ms
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.11x slower than postgres.js
|
||||
3.87x faster than deno-postgres
|
||||
1.06x faster than postgres-js
|
||||
4.57x faster than deno-postgres
|
||||
|
||||
group select n=10
|
||||
pglue 97.8 ms 10.2 ( 90.2 ms … 105.0 ms) 104.0 ms 105.0 ms 105.0 ms
|
||||
postgres.js 93.8 ms 10.7 ( 80.9 ms … 107.7 ms) 106.1 ms 107.7 ms 107.7 ms
|
||||
deno-postgres 333.9 ms 3.0 (205.6 ms … 394.9 ms) 377.4 ms 394.9 ms 394.9 ms
|
||||
pglue 78.9 ms 12.7 ( 72.3 ms … 88.9 ms) 82.5 ms 88.9 ms 88.9 ms
|
||||
postgres-js 92.0 ms 10.9 ( 77.6 ms … 113.6 ms) 101.2 ms 113.6 ms 113.6 ms
|
||||
deno-postgres 326.6 ms 3.1 (208.8 ms … 406.0 ms) 388.8 ms 406.0 ms 406.0 ms
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.04x slower than postgres.js
|
||||
3.42x faster than deno-postgres
|
||||
1.17x faster than postgres-js
|
||||
4.14x faster than deno-postgres
|
||||
```
|
||||
|
||||
Query `insert into my_table (a, b, c) values (${a}, ${b}, ${c})`:
|
||||
|
||||
```log
|
||||
group insert n=1
|
||||
pglue 237.5 µs 4,210 (143.9 µs … 1.3 ms) 249.2 µs 953.3 µs 1.3 ms
|
||||
postgres.js 242.5 µs 4,124 (137.4 µs … 886.4 µs) 263.4 µs 762.8 µs 865.5 µs
|
||||
deno-postgres 295.1 µs 3,389 (163.8 µs … 899.3 µs) 340.0 µs 641.7 µs 899.3 µs
|
||||
pglue 303.3 µs 3,297 (165.6 µs … 2.4 ms) 321.6 µs 1.1 ms 2.4 ms
|
||||
postgres-js 260.4 µs 3,840 (132.9 µs … 2.7 ms) 276.4 µs 1.1 ms 2.7 ms
|
||||
deno-postgres 281.6 µs 3,552 (186.1 µs … 1.5 ms) 303.8 µs 613.6 µs 791.8 µs
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.02x faster than postgres.js
|
||||
1.24x faster than deno-postgres
|
||||
1.17x slower than postgres-js
|
||||
1.08x slower than deno-postgres
|
||||
|
||||
group insert n=10
|
||||
pglue 1.1 ms 869.6 (610.1 µs … 2.1 ms) 1.2 ms 2.0 ms 2.1 ms
|
||||
postgres.js 755.9 µs 1,323 (387.6 µs … 4.7 ms) 805.4 µs 2.8 ms 4.7 ms
|
||||
deno-postgres 2.3 ms 434.4 ( 1.6 ms … 10.6 ms) 2.4 ms 6.5 ms 10.6 ms
|
||||
pglue 1.1 ms 878.5 (605.5 µs … 3.2 ms) 1.1 ms 2.2 ms 3.2 ms
|
||||
postgres-js 849.3 µs 1,177 (529.5 µs … 10.1 ms) 770.6 µs 3.0 ms 10.1 ms
|
||||
deno-postgres 2.3 ms 439.4 ( 1.4 ms … 4.9 ms) 2.5 ms 4.1 ms 4.9 ms
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.52x slower than postgres.js
|
||||
1.34x slower than postgres-js
|
||||
2.00x faster than deno-postgres
|
||||
|
||||
group insert n=100
|
||||
pglue 9.2 ms 109.0 ( 5.5 ms … 15.6 ms) 10.4 ms 15.6 ms 15.6 ms
|
||||
postgres.js 14.8 ms 67.4 ( 9.6 ms … 35.8 ms) 16.6 ms 35.8 ms 35.8 ms
|
||||
deno-postgres 18.8 ms 53.1 ( 14.5 ms … 25.8 ms) 20.9 ms 25.8 ms 25.8 ms
|
||||
pglue 8.3 ms 121.0 ( 5.0 ms … 13.6 ms) 9.3 ms 13.6 ms 13.6 ms
|
||||
postgres-js 13.0 ms 76.7 ( 9.0 ms … 26.9 ms) 14.1 ms 26.9 ms 26.9 ms
|
||||
deno-postgres 19.8 ms 50.5 ( 14.2 ms … 31.8 ms) 22.5 ms 31.8 ms 31.8 ms
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.62x faster than postgres.js
|
||||
2.05x faster than deno-postgres
|
||||
1.58x faster than postgres-js
|
||||
2.40x faster than deno-postgres
|
||||
|
||||
group insert n=200
|
||||
pglue 15.0 ms 66.6 ( 11.1 ms … 19.0 ms) 16.7 ms 19.0 ms 19.0 ms
|
||||
postgres.js 28.1 ms 35.6 ( 22.8 ms … 40.0 ms) 29.1 ms 40.0 ms 40.0 ms
|
||||
deno-postgres 35.9 ms 27.9 ( 29.7 ms … 46.5 ms) 37.2 ms 46.5 ms 46.5 ms
|
||||
pglue 15.1 ms 66.2 ( 9.4 ms … 21.1 ms) 16.8 ms 21.1 ms 21.1 ms
|
||||
postgres-js 27.8 ms 36.0 ( 22.5 ms … 39.2 ms) 30.2 ms 39.2 ms 39.2 ms
|
||||
deno-postgres 40.6 ms 24.6 ( 33.5 ms … 51.4 ms) 42.2 ms 51.4 ms 51.4 ms
|
||||
|
||||
summary
|
||||
pglue
|
||||
1.87x faster than postgres.js
|
||||
2.39x faster than deno-postgres
|
||||
1.84x faster than postgres-js
|
||||
2.68x faster than deno-postgres
|
||||
```
|
||||
|
||||
[1]: https://github.com/porsager/postgres
|
||||
|
||||
6
bench.ts
6
bench.ts
@@ -1,4 +1,4 @@
|
||||
import * as pglue from "./mod.ts";
|
||||
import pglue from "./mod.ts";
|
||||
import postgres_js from "https://deno.land/x/postgresjs/mod.js";
|
||||
import * as deno_postgres from "https://deno.land/x/postgres/mod.ts";
|
||||
|
||||
@@ -60,7 +60,7 @@ for (const n of [1, 5, 10]) {
|
||||
});
|
||||
|
||||
Deno.bench({
|
||||
name: `postgres.js`,
|
||||
name: `postgres-js`,
|
||||
group: `select n=${n}`,
|
||||
async fn(b) {
|
||||
await bench_select(b, n, () => c_pgjs`select * from pg_type`);
|
||||
@@ -95,7 +95,7 @@ for (const n of [1, 10, 100, 200]) {
|
||||
});
|
||||
|
||||
Deno.bench({
|
||||
name: `postgres.js`,
|
||||
name: `postgres-js`,
|
||||
group: `insert n=${n}`,
|
||||
async fn(b) {
|
||||
await c_pgjs`begin`;
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
{
|
||||
"name": "@luaneko/pglue",
|
||||
"version": "0.1.0",
|
||||
"exports": "./mod.ts",
|
||||
"tasks": {
|
||||
"test": "deno run --watch -A mod_test.ts",
|
||||
"bench": "deno bench --watch -A"
|
||||
}
|
||||
"version": "0.1.3",
|
||||
"exports": "./mod.ts"
|
||||
}
|
||||
|
||||
45
deno.lock
generated
45
deno.lock
generated
@@ -2,23 +2,50 @@
|
||||
"version": "4",
|
||||
"specifiers": {
|
||||
"jsr:@badrap/valita@~0.4.2": "0.4.2",
|
||||
"jsr:@std/assert@^1.0.10": "1.0.10",
|
||||
"jsr:@std/bytes@^1.0.3": "1.0.4",
|
||||
"jsr:@std/bytes@^1.0.4": "1.0.4",
|
||||
"jsr:@std/encoding@^1.0.6": "1.0.6",
|
||||
"jsr:@std/expect@*": "1.0.11",
|
||||
"jsr:@std/internal@^1.0.5": "1.0.5",
|
||||
"jsr:@std/path@^1.0.8": "1.0.8",
|
||||
"jsr:@std/streams@*": "1.0.8",
|
||||
"npm:pg-connection-string@^2.7.0": "2.7.0"
|
||||
},
|
||||
"jsr": {
|
||||
"@badrap/valita@0.4.2": {
|
||||
"integrity": "af8a829e82eac71adbc7b60352798f94dcc66d19fab16b657957ca9e646c25fd"
|
||||
},
|
||||
"@std/assert@1.0.10": {
|
||||
"integrity": "59b5cbac5bd55459a19045d95cc7c2ff787b4f8527c0dd195078ff6f9481fbb3",
|
||||
"dependencies": [
|
||||
"jsr:@std/internal"
|
||||
]
|
||||
},
|
||||
"@std/bytes@1.0.4": {
|
||||
"integrity": "11a0debe522707c95c7b7ef89b478c13fb1583a7cfb9a85674cd2cc2e3a28abc"
|
||||
},
|
||||
"@std/encoding@1.0.6": {
|
||||
"integrity": "ca87122c196e8831737d9547acf001766618e78cd8c33920776c7f5885546069"
|
||||
},
|
||||
"@std/expect@1.0.11": {
|
||||
"integrity": "5aa5d5cf891e9a3249e45ea770de15189e5a2faee2122ee5746b10d1c310a19b",
|
||||
"dependencies": [
|
||||
"jsr:@std/assert",
|
||||
"jsr:@std/internal"
|
||||
]
|
||||
},
|
||||
"@std/internal@1.0.5": {
|
||||
"integrity": "54a546004f769c1ac9e025abd15a76b6671ddc9687e2313b67376125650dc7ba"
|
||||
},
|
||||
"@std/path@1.0.8": {
|
||||
"integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be"
|
||||
},
|
||||
"@std/streams@1.0.8": {
|
||||
"integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3",
|
||||
"dependencies": [
|
||||
"jsr:@std/bytes@^1.0.3"
|
||||
]
|
||||
}
|
||||
},
|
||||
"npm": {
|
||||
@@ -433,17 +460,11 @@
|
||||
"https://deno.land/x/postgresjs@v3.4.5/src/result.js": "001ff5e0c8d634674f483d07fbcd620a797e3101f842d6c20ca3ace936260465",
|
||||
"https://deno.land/x/postgresjs@v3.4.5/src/subscribe.js": "9e4d0c3e573a6048e77ee2f15abbd5bcd17da9ca85a78c914553472c6d6c169b",
|
||||
"https://deno.land/x/postgresjs@v3.4.5/src/types.js": "471f4a6c35412aa202a7c177c0a7e5a7c3bd225f01bbde67c947894c1b8bf6ed",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.2/async.ts": "ec1a2d25af2320f136b8648b25b590b7b6603525474f0d10b3ebf2215a5c23e5",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.2/bytes.ts": "39d4c08f6446041f1d078bbf285187c337d49f853b20ec637cf1516fae8b3729",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.2/events.ts": "51bf13b819d1c4af792a40ff5d8d08407502d3f01d94f6b6866156f52cbe5d64",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.2/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.2/jit.ts": "1b7eec61ece15c05146446972a59d8d5787d4ba53ca1194f4450134d66a65f91",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.2/mod.ts": "d7ef832245676b097c4fb7829c5cb2df80c02d2bd28767168c4f83bc309c9b1a",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/bytes.ts": "39d4c08f6446041f1d078bbf285187c337d49f853b20ec637cf1516fae8b3729",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/events.ts": "c4f2c856cbc7ac5d93b9af9b83d9550db7427cead32514a10424082e492005ae",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/jit.ts": "260ab418fbc55a5dec594f023c84d36f8d420fd3239e3d27648cba1b9a0e05b1",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/mod.ts": "dd9271f4e5aae4bfb1ec6b0800697ded12e4178af915acb2b96b97614ae8c8d9"
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/async.ts": "20bc54c7260c2d2cd27ffcca33b903dde57a3a3635386d8e0c6baca4b253ae4e",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/bytes.ts": "5ffb12787dc3f9ef9680b6e2e4f5f9903783aa4c33b69e725b5df1d1c116bfe6",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/events.ts": "28d395b8eea87f9bf7908a44b351d2d3c609ba7eab62bcecd0d43be8ee603438",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/func.ts": "f1935f673365cd68939531d65ef18fe81b5d43dc795b03c34bb5ad821ab1c9ff",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/jit.ts": "c1db7820de95c48521b057c7cdf9aa41f7eaba77462407c29d3932e7da252d53",
|
||||
"https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts": "95d8b15048a54cb82391825831f695b74e7c8b206317264a99c906ce25c63f13"
|
||||
}
|
||||
}
|
||||
|
||||
2
lstd.ts
2
lstd.ts
@@ -1 +1 @@
|
||||
export * from "https://git.lua.re/luaneko/lstd/raw/tag/0.1.3/mod.ts";
|
||||
export * from "https://git.lua.re/luaneko/lstd/raw/tag/v0.2.0/mod.ts";
|
||||
|
||||
38
mod.ts
38
mod.ts
@@ -9,7 +9,7 @@ import {
|
||||
unknown,
|
||||
} from "./valita.ts";
|
||||
import { Pool, wire_connect, type LogLevel } from "./wire.ts";
|
||||
import { type FromSql, type ToSql, from_sql, to_sql } from "./sql.ts";
|
||||
import { sql_types, type SqlTypeMap } from "./query.ts";
|
||||
|
||||
export {
|
||||
WireError,
|
||||
@@ -21,13 +21,11 @@ export {
|
||||
} from "./wire.ts";
|
||||
export {
|
||||
type SqlFragment,
|
||||
type FromSql,
|
||||
type ToSql,
|
||||
SqlValue,
|
||||
type SqlType,
|
||||
type SqlTypeMap,
|
||||
SqlTypeError,
|
||||
sql,
|
||||
is_sql,
|
||||
} from "./sql.ts";
|
||||
export {
|
||||
Query,
|
||||
type Row,
|
||||
type CommandResult,
|
||||
@@ -45,8 +43,7 @@ export type Options = {
|
||||
max_connections?: number;
|
||||
idle_timeout?: number;
|
||||
runtime_params?: Record<string, string>;
|
||||
from_sql?: FromSql;
|
||||
to_sql?: ToSql;
|
||||
types?: SqlTypeMap;
|
||||
};
|
||||
|
||||
type ParsedOptions = Infer<typeof ParsedOptions>;
|
||||
@@ -64,15 +61,12 @@ const ParsedOptions = object({
|
||||
runtime_params: record(string()).optional(() => ({})),
|
||||
max_connections: number().optional(() => 10),
|
||||
idle_timeout: number().optional(() => 20),
|
||||
from_sql: unknown()
|
||||
.assert((s): s is FromSql => typeof s === "function")
|
||||
.optional(() => from_sql),
|
||||
to_sql: unknown()
|
||||
.assert((s): s is ToSql => typeof s === "function")
|
||||
.optional(() => to_sql),
|
||||
types: record(unknown())
|
||||
.optional(() => ({}))
|
||||
.map((types): SqlTypeMap => ({ ...sql_types, ...types })),
|
||||
});
|
||||
|
||||
function parse_opts(s: string, options: Options) {
|
||||
function parse_opts(s: string, opts: Options) {
|
||||
const {
|
||||
host,
|
||||
port,
|
||||
@@ -87,13 +81,13 @@ function parse_opts(s: string, options: Options) {
|
||||
Deno.env.toObject();
|
||||
|
||||
return ParsedOptions.parse({
|
||||
...options,
|
||||
host: options.host ?? host ?? PGHOST ?? undefined,
|
||||
port: options.port ?? port ?? PGPORT ?? undefined,
|
||||
user: options.user ?? user ?? PGUSER ?? USER ?? undefined,
|
||||
password: options.password ?? password ?? PGPASSWORD ?? undefined,
|
||||
database: options.database ?? database ?? PGDATABASE ?? undefined,
|
||||
runtime_params: { ...runtime_params, ...options.runtime_params },
|
||||
...opts,
|
||||
host: opts.host ?? host ?? PGHOST ?? undefined,
|
||||
port: opts.port ?? port ?? PGPORT ?? undefined,
|
||||
user: opts.user ?? user ?? PGUSER ?? USER ?? undefined,
|
||||
password: opts.password ?? password ?? PGPASSWORD ?? undefined,
|
||||
database: opts.database ?? database ?? PGDATABASE ?? undefined,
|
||||
runtime_params: { ...runtime_params, ...opts.runtime_params },
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
13
mod_test.ts
13
mod_test.ts
@@ -1,13 +0,0 @@
|
||||
import postgres from "./mod.ts";
|
||||
|
||||
await using pool = postgres(`postgres://test:test@localhost:5432/test`, {
|
||||
runtime_params: { client_min_messages: "INFO" },
|
||||
});
|
||||
|
||||
pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx));
|
||||
|
||||
await pool.begin(async (pg) => {
|
||||
await pg.begin(async (pg) => {
|
||||
console.log(await pg.query`select * from pg_user`);
|
||||
});
|
||||
});
|
||||
378
query.ts
378
query.ts
@@ -1,64 +1,328 @@
|
||||
import type { ObjectType } from "./valita.ts";
|
||||
import { from_utf8, jit, to_utf8 } from "./lstd.ts";
|
||||
import { type FromSql, SqlValue } from "./sql.ts";
|
||||
import { from_hex, to_hex, to_utf8 } from "./lstd.ts";
|
||||
|
||||
export interface Row extends Iterable<unknown, void, void> {
|
||||
[column: string]: unknown;
|
||||
export const sql_format = Symbol.for(`re.lua.pglue.sql_format`);
|
||||
|
||||
export interface SqlFragment {
|
||||
[sql_format](f: SqlFormatter): void;
|
||||
}
|
||||
|
||||
export interface RowConstructor {
|
||||
new (columns: (Uint8Array | string | null)[]): Row;
|
||||
export interface SqlFormatter {
|
||||
query: string;
|
||||
params: unknown[];
|
||||
}
|
||||
|
||||
export interface RowDescription extends ReadonlyArray<ColumnDescription> {}
|
||||
|
||||
export interface ColumnDescription {
|
||||
readonly name: string;
|
||||
readonly table_oid: number;
|
||||
readonly table_column: number;
|
||||
readonly type_oid: number;
|
||||
readonly type_size: number;
|
||||
readonly type_modifier: number;
|
||||
export function is_sql(x: unknown): x is SqlFragment {
|
||||
return typeof x === "object" && x !== null && sql_format in x;
|
||||
}
|
||||
|
||||
export function row_ctor(from_sql: FromSql, columns: RowDescription) {
|
||||
function parse(s: Uint8Array | string | null | undefined) {
|
||||
if (!s && s !== "") return null;
|
||||
else return from_utf8(s);
|
||||
export function sql(
|
||||
{ raw: s }: TemplateStringsArray,
|
||||
...xs: unknown[]
|
||||
): SqlFragment {
|
||||
return {
|
||||
[sql_format](fmt) {
|
||||
for (let i = 0, n = s.length; i < n; i++) {
|
||||
if (i !== 0) fmt_format(fmt, xs[i - 1]);
|
||||
fmt.query += s[i];
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function fmt_write(fmt: SqlFormatter, s: string | SqlFragment) {
|
||||
is_sql(s) ? s[sql_format](fmt) : (fmt.query += s);
|
||||
}
|
||||
|
||||
export function fmt_format(fmt: SqlFormatter, x: unknown) {
|
||||
is_sql(x) ? x[sql_format](fmt) : fmt_enclose(fmt, x);
|
||||
}
|
||||
|
||||
export function fmt_enclose(fmt: SqlFormatter, x: unknown) {
|
||||
const { params } = fmt;
|
||||
params.push(x), (fmt.query += `$` + params.length);
|
||||
}
|
||||
|
||||
sql.format = format;
|
||||
sql.raw = raw;
|
||||
sql.ident = ident;
|
||||
sql.fragment = fragment;
|
||||
sql.map = map;
|
||||
sql.array = array;
|
||||
sql.row = row;
|
||||
|
||||
export function format(sql: SqlFragment) {
|
||||
const fmt: SqlFormatter = { query: "", params: [] };
|
||||
return sql[sql_format](fmt), fmt;
|
||||
}
|
||||
|
||||
export function raw(s: string): SqlFragment;
|
||||
export function raw(s: TemplateStringsArray, ...xs: unknown[]): SqlFragment;
|
||||
export function raw(
|
||||
s: TemplateStringsArray | string,
|
||||
...xs: unknown[]
|
||||
): SqlFragment {
|
||||
s = typeof s === "string" ? s : String.raw(s, ...xs);
|
||||
return {
|
||||
[sql_format](fmt) {
|
||||
fmt.query += s;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function ident(s: string): SqlFragment;
|
||||
export function ident(s: TemplateStringsArray, ...xs: unknown[]): SqlFragment;
|
||||
export function ident(s: TemplateStringsArray | string, ...xs: unknown[]) {
|
||||
s = typeof s === "string" ? s : String.raw(s, ...xs);
|
||||
return raw`"${s.replaceAll('"', '""')}"`;
|
||||
}
|
||||
|
||||
export function fragment(
|
||||
sep: string | SqlFragment,
|
||||
...xs: unknown[]
|
||||
): SqlFragment {
|
||||
return {
|
||||
[sql_format](fmt) {
|
||||
for (let i = 0, n = xs.length; i < n; i++) {
|
||||
if (i !== 0) fmt_write(fmt, sep);
|
||||
fmt_format(fmt, xs[i]);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function map<T>(
|
||||
sep: string | SqlFragment,
|
||||
xs: Iterable<T>,
|
||||
f: (value: T, index: number) => unknown
|
||||
) {
|
||||
return fragment(sep, ...Iterator.from(xs).map(f));
|
||||
}
|
||||
|
||||
export function array(...xs: unknown[]) {
|
||||
return sql`array[${fragment(", ", ...xs)}]`;
|
||||
}
|
||||
|
||||
export function row(...xs: unknown[]) {
|
||||
return sql`row(${fragment(", ", ...xs)})`;
|
||||
}
|
||||
|
||||
export interface SqlType {
|
||||
input(value: string): unknown;
|
||||
output(value: unknown): string | null;
|
||||
}
|
||||
|
||||
export interface SqlTypeMap {
|
||||
readonly [oid: number]: SqlType | undefined;
|
||||
}
|
||||
|
||||
export class SqlTypeError extends TypeError {
|
||||
override get name() {
|
||||
return this.constructor.name;
|
||||
}
|
||||
|
||||
const Row = jit.compiled<RowConstructor>`function Row(xs) {
|
||||
${jit.map(" ", columns, ({ name, type_oid }, i) => {
|
||||
return jit`this[${jit.literal(name)}] = ${from_sql}(
|
||||
new ${SqlValue}(${jit.literal(type_oid)}, ${parse}(xs[${jit.literal(i)}]))
|
||||
);`;
|
||||
})}
|
||||
}`;
|
||||
|
||||
Row.prototype = Object.create(null, {
|
||||
[Symbol.toStringTag]: {
|
||||
configurable: true,
|
||||
value: `Row`,
|
||||
},
|
||||
[Symbol.toPrimitive]: {
|
||||
configurable: true,
|
||||
value: function format() {
|
||||
return [...this].join("\t");
|
||||
},
|
||||
},
|
||||
[Symbol.iterator]: {
|
||||
configurable: true,
|
||||
value: jit.compiled`function* iter() {
|
||||
${jit.map(" ", columns, ({ name }) => {
|
||||
return jit`yield this[${jit.literal(name)}];`;
|
||||
})}
|
||||
}`,
|
||||
},
|
||||
});
|
||||
|
||||
return Row;
|
||||
}
|
||||
|
||||
export const bool: SqlType = {
|
||||
input(s) {
|
||||
return s !== "f";
|
||||
},
|
||||
output(x) {
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
const b = bool_names[String(x).toLowerCase()];
|
||||
if (typeof b === "boolean") return b ? "t" : "f";
|
||||
else throw new SqlTypeError(`invalid bool output '${x}'`);
|
||||
},
|
||||
};
|
||||
|
||||
const bool_names: Partial<Record<string, boolean>> = {
|
||||
// https://www.postgresql.org/docs/current/datatype-boolean.html#DATATYPE-BOOLEAN
|
||||
t: true,
|
||||
tr: true,
|
||||
tru: true,
|
||||
true: true,
|
||||
y: true,
|
||||
ye: true,
|
||||
yes: true,
|
||||
on: true,
|
||||
1: true,
|
||||
f: false,
|
||||
fa: false,
|
||||
fal: false,
|
||||
fals: false,
|
||||
false: false,
|
||||
n: false,
|
||||
no: false,
|
||||
of: false,
|
||||
off: false,
|
||||
0: false,
|
||||
};
|
||||
|
||||
export const text: SqlType = {
|
||||
input(s) {
|
||||
return s;
|
||||
},
|
||||
output(x) {
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "string") return x;
|
||||
else return String(x);
|
||||
},
|
||||
};
|
||||
|
||||
export const int2: SqlType = {
|
||||
input(s) {
|
||||
const n = Number(s);
|
||||
if (Number.isInteger(n) && -32768 <= n && n <= 32767) return n;
|
||||
else throw new SqlTypeError(`invalid int2 input '${s}'`);
|
||||
},
|
||||
output(x) {
|
||||
let n: number;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "number") n = x;
|
||||
else n = Number(x);
|
||||
if (Number.isInteger(n) && -32768 <= n && n <= 32767) return n.toString();
|
||||
else throw new SqlTypeError(`invalid int2 output '${x}'`);
|
||||
},
|
||||
};
|
||||
|
||||
export const int4: SqlType = {
|
||||
input(s) {
|
||||
const n = Number(s);
|
||||
if (Number.isInteger(n) && -2147483648 <= n && n <= 2147483647) return n;
|
||||
else throw new SqlTypeError(`invalid int4 input '${s}'`);
|
||||
},
|
||||
output(x) {
|
||||
let n: number;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "number") n = x;
|
||||
else n = Number(x);
|
||||
if (Number.isInteger(n) && -2147483648 <= n && n <= 2147483647)
|
||||
return n.toString();
|
||||
else throw new SqlTypeError(`invalid int4 output '${x}'`);
|
||||
},
|
||||
};
|
||||
|
||||
export const int8: SqlType = {
|
||||
input(s) {
|
||||
const n = BigInt(s);
|
||||
if (-9007199254740991n <= n && n <= 9007199254740991n) return Number(n);
|
||||
else if (-9223372036854775808n <= n && n <= 9223372036854775807n) return n;
|
||||
else throw new SqlTypeError(`invalid int8 input '${s}'`);
|
||||
},
|
||||
output(x) {
|
||||
let n: number | bigint;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "number" || typeof x === "bigint") n = x;
|
||||
else if (typeof x === "string") n = BigInt(x);
|
||||
else n = Number(x);
|
||||
if (Number.isInteger(n)) {
|
||||
if (-9007199254740991 <= n && n <= 9007199254740991) return n.toString();
|
||||
else throw new SqlTypeError(`unsafe int8 output '${x}'`);
|
||||
} else if (typeof n === "bigint") {
|
||||
if (-9223372036854775808n <= n && n <= 9223372036854775807n)
|
||||
return n.toString();
|
||||
}
|
||||
throw new SqlTypeError(`invalid int8 output '${x}'`);
|
||||
},
|
||||
};
|
||||
|
||||
export const float4: SqlType = {
|
||||
input(s) {
|
||||
return Math.fround(Number(s));
|
||||
},
|
||||
output(x) {
|
||||
let n: number;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "number") n = x;
|
||||
else {
|
||||
n = Number(x);
|
||||
if (Number.isNaN(n))
|
||||
throw new SqlTypeError(`invalid float4 output '${x}'`);
|
||||
}
|
||||
return Math.fround(n).toString();
|
||||
},
|
||||
};
|
||||
|
||||
export const float8: SqlType = {
|
||||
input(s) {
|
||||
return Number(s);
|
||||
},
|
||||
output(x) {
|
||||
let n: number;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "number") n = x;
|
||||
else {
|
||||
n = Number(x);
|
||||
if (Number.isNaN(n))
|
||||
throw new SqlTypeError(`invalid float8 output '${x}'`);
|
||||
}
|
||||
return n.toString();
|
||||
},
|
||||
};
|
||||
|
||||
export const timestamptz: SqlType = {
|
||||
input(s) {
|
||||
const t = Date.parse(s);
|
||||
if (!Number.isNaN(t)) return new Date(t);
|
||||
else throw new SqlTypeError(`invalid timestamptz input '${s}'`);
|
||||
},
|
||||
output(x) {
|
||||
let t: Date;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (x instanceof Date) t = x;
|
||||
else if (typeof x === "number" || typeof x === "bigint")
|
||||
t = new Date(Number(x) * 1000); // unix epoch seconds
|
||||
else t = new Date(String(x));
|
||||
if (Number.isFinite(t.getTime())) return t.toISOString();
|
||||
else throw new SqlTypeError(`invalid timestamptz output '${x}'`);
|
||||
},
|
||||
};
|
||||
|
||||
export const bytea: SqlType = {
|
||||
input(s) {
|
||||
if (s.startsWith(`\\x`)) return from_hex(s.slice(2));
|
||||
else throw new SqlTypeError(`invalid bytea input '${s}'`);
|
||||
},
|
||||
output(x) {
|
||||
let buf: Uint8Array;
|
||||
if (typeof x === "undefined" || x === null) return null;
|
||||
else if (typeof x === "string") buf = to_utf8(x);
|
||||
else if (x instanceof Uint8Array) buf = x;
|
||||
else if (x instanceof ArrayBuffer || x instanceof SharedArrayBuffer)
|
||||
buf = new Uint8Array(x);
|
||||
else if (Array.isArray(x) || x instanceof Array) buf = Uint8Array.from(x);
|
||||
else throw new SqlTypeError(`invalid bytea output '${x}'`);
|
||||
return `\\x` + to_hex(buf);
|
||||
},
|
||||
};
|
||||
|
||||
export const json: SqlType = {
|
||||
input(s) {
|
||||
return JSON.parse(s);
|
||||
},
|
||||
output(x) {
|
||||
return typeof x === "undefined" ? null : JSON.stringify(x);
|
||||
},
|
||||
};
|
||||
|
||||
export const sql_types: SqlTypeMap = {
|
||||
16: bool, // bool
|
||||
25: text, // text
|
||||
21: int2, // int2
|
||||
23: int4, // int4
|
||||
20: int8, // int8
|
||||
26: int8, // oid
|
||||
700: float4, // float4
|
||||
701: float8, // float8
|
||||
1082: timestamptz, // date
|
||||
1114: timestamptz, // timestamp
|
||||
1184: timestamptz, // timestamptz
|
||||
17: bytea, // bytea
|
||||
114: json, // json
|
||||
3802: json, // jsonb
|
||||
};
|
||||
|
||||
sql.types = sql_types;
|
||||
|
||||
type ReadonlyTuple<T extends readonly unknown[]> = readonly [...T];
|
||||
|
||||
export interface CommandResult {
|
||||
@@ -76,6 +340,10 @@ export interface Results<T> extends CommandResult, ReadonlyArray<T> {
|
||||
export interface ResultStream<T>
|
||||
extends AsyncIterable<T[], CommandResult, void> {}
|
||||
|
||||
export interface Row extends Iterable<unknown, void, void> {
|
||||
[column: string]: unknown;
|
||||
}
|
||||
|
||||
export interface QueryOptions {
|
||||
readonly chunk_size: number;
|
||||
readonly stdin: ReadableStream<Uint8Array> | null;
|
||||
@@ -159,7 +427,7 @@ export class Query<T = Row>
|
||||
|
||||
async first(): Promise<Result<T>> {
|
||||
const { rows, tag } = await this.collect(1);
|
||||
if (!rows.length) throw new Error(`expected one row, got none instead`);
|
||||
if (!rows.length) throw new TypeError(`expected one row, got none instead`);
|
||||
const row = rows[0];
|
||||
return Object.assign([row] as const, { row: rows[0], tag });
|
||||
}
|
||||
@@ -175,8 +443,9 @@ export class Query<T = Row>
|
||||
let next;
|
||||
const rows = [];
|
||||
for (let i = 0; !(next = await iter.next()).done; ) {
|
||||
const { value: c } = next;
|
||||
for (let j = 0, n = c.length; i < count && j < n; ) rows[i++] = c[j++];
|
||||
const chunk = next.value;
|
||||
for (let j = 0, n = chunk.length; i < count && j < n; )
|
||||
rows[i++] = chunk[j++];
|
||||
}
|
||||
return Object.assign(rows, next.value, { rows });
|
||||
}
|
||||
@@ -211,7 +480,8 @@ function str_to_stream(s: string) {
|
||||
return new ReadableStream({
|
||||
type: "bytes",
|
||||
start(c) {
|
||||
c.enqueue(to_utf8(s)), c.close();
|
||||
if (s.length !== 0) c.enqueue(to_utf8(s));
|
||||
c.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
99
ser.ts
99
ser.ts
@@ -1,4 +1,15 @@
|
||||
import { encode_utf8, from_utf8, jit } from "./lstd.ts";
|
||||
import {
|
||||
type BinaryLike,
|
||||
encode_utf8,
|
||||
from_utf8,
|
||||
jit,
|
||||
read_i16_be,
|
||||
read_i32_be,
|
||||
read_i8,
|
||||
write_i16_be,
|
||||
write_i32_be,
|
||||
write_i8,
|
||||
} from "./lstd.ts";
|
||||
|
||||
export class EncoderError extends Error {
|
||||
override get name() {
|
||||
@@ -30,44 +41,31 @@ export interface Encoder<T> {
|
||||
export type EncoderType<E extends Encoder<unknown>> =
|
||||
E extends Encoder<infer T> ? T : never;
|
||||
|
||||
export function sum_const_size(...ns: (number | null)[]) {
|
||||
let sum = 0;
|
||||
for (const n of ns) {
|
||||
if (n !== null) sum += n;
|
||||
else return null;
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
// https://www.postgresql.org/docs/current/protocol-message-types.html#PROTOCOL-MESSAGE-TYPES
|
||||
export const u8: Encoder<number> = {
|
||||
export const i8: Encoder<number> = {
|
||||
const_size: 1,
|
||||
allocs() {
|
||||
return 1;
|
||||
},
|
||||
encode(buf, cur, n) {
|
||||
buf[cur.i++] = n & 0xff;
|
||||
write_i8(buf, n, cur.i++);
|
||||
},
|
||||
decode(buf, cur) {
|
||||
return buf[cur.i++];
|
||||
return read_i8(buf, cur.i++);
|
||||
},
|
||||
};
|
||||
|
||||
export const u16: Encoder<number> = {
|
||||
export const i16: Encoder<number> = {
|
||||
const_size: 2,
|
||||
allocs() {
|
||||
return 2;
|
||||
},
|
||||
encode(buf, cur, n) {
|
||||
let { i } = cur;
|
||||
buf[i++] = (n >>> 8) & 0xff;
|
||||
buf[i++] = n & 0xff;
|
||||
cur.i = i;
|
||||
write_i16_be(buf, n, cur.i), (cur.i += 2);
|
||||
},
|
||||
decode(buf, cur) {
|
||||
let { i } = cur;
|
||||
const n = (buf[i++] << 8) + buf[i++];
|
||||
return (cur.i = i), n;
|
||||
const n = read_i16_be(buf, cur.i);
|
||||
return (cur.i += 2), n;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -77,17 +75,11 @@ export const i32: Encoder<number> = {
|
||||
return 4;
|
||||
},
|
||||
encode(buf, cur, n) {
|
||||
let { i } = cur;
|
||||
buf[i++] = (n >>> 24) & 0xff;
|
||||
buf[i++] = (n >>> 16) & 0xff;
|
||||
buf[i++] = (n >>> 8) & 0xff;
|
||||
buf[i++] = n & 0xff;
|
||||
cur.i = i;
|
||||
write_i32_be(buf, n, cur.i), (cur.i += 4);
|
||||
},
|
||||
decode(buf, cur) {
|
||||
let { i } = cur;
|
||||
const n = (buf[i++] << 24) + (buf[i++] << 16) + (buf[i++] << 8) + buf[i++];
|
||||
return (cur.i = i), n;
|
||||
const n = read_i32_be(buf, cur.i);
|
||||
return (cur.i += 4), n;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -118,7 +110,21 @@ export function byten(n: number): Encoder<Uint8Array> {
|
||||
};
|
||||
}
|
||||
|
||||
export const byten_lp: Encoder<Uint8Array | string | null> = {
|
||||
export const bytes: Encoder<BinaryLike> = {
|
||||
const_size: null,
|
||||
allocs(s) {
|
||||
if (typeof s === "string") return s.length * 3;
|
||||
else return s.length;
|
||||
},
|
||||
encode(buf, cur, s) {
|
||||
cur.i += encode_utf8(s, buf.subarray(cur.i));
|
||||
},
|
||||
decode(buf, cur) {
|
||||
return buf.subarray(cur.i, (cur.i = buf.length));
|
||||
},
|
||||
};
|
||||
|
||||
export const bytes_lp: Encoder<BinaryLike | null> = {
|
||||
const_size: null,
|
||||
allocs(s) {
|
||||
let size = 4;
|
||||
@@ -140,20 +146,6 @@ export const byten_lp: Encoder<Uint8Array | string | null> = {
|
||||
},
|
||||
};
|
||||
|
||||
export const byten_rest: Encoder<Uint8Array | string> = {
|
||||
const_size: null,
|
||||
allocs(s) {
|
||||
if (typeof s === "string") return s.length * 3;
|
||||
else return s.length;
|
||||
},
|
||||
encode(buf, cur, s) {
|
||||
cur.i += encode_utf8(s, buf.subarray(cur.i));
|
||||
},
|
||||
decode(buf, cur) {
|
||||
return buf.subarray(cur.i, (cur.i = buf.length));
|
||||
},
|
||||
};
|
||||
|
||||
export const cstring: Encoder<string> = {
|
||||
const_size: null,
|
||||
allocs(s) {
|
||||
@@ -215,7 +207,7 @@ export function array<T>(
|
||||
): ArrayEncoder<T> {
|
||||
const { const_size } = type;
|
||||
return {
|
||||
const_size,
|
||||
const_size: null,
|
||||
allocs:
|
||||
const_size !== null
|
||||
? function allocs(xs: T[]) {
|
||||
@@ -249,21 +241,28 @@ export interface ObjectEncoder<S extends ObjectShape>
|
||||
export function object<S extends ObjectShape>(shape: S): ObjectEncoder<S> {
|
||||
const keys = Object.keys(shape);
|
||||
return jit.compiled`{
|
||||
const_size: ${jit.literal(sum_const_size(...keys.map((k) => shape[k].const_size)))},
|
||||
const_size: null,
|
||||
allocs(x) {
|
||||
return ${jit.if(
|
||||
keys.length === 0,
|
||||
jit`0`,
|
||||
jit.map(" + ", keys, (k) => {
|
||||
return shape[k].const_size ?? jit`${shape[k]}.allocs(x[${k}])`;
|
||||
})
|
||||
)};
|
||||
return 0${jit.map("", keys, (k) => {
|
||||
return jit` + ${shape[k]}.allocs(x[${jit.literal(k)}])`;
|
||||
return jit` + ${shape[k]}.allocs(x[${k}])`;
|
||||
})};
|
||||
},
|
||||
encode(buf, cur, x) {
|
||||
${jit.map(" ", keys, (k) => {
|
||||
return jit`${shape[k]}.encode(buf, cur, x[${jit.literal(k)}]);`;
|
||||
return jit`${shape[k]}.encode(buf, cur, x[${k}]);`;
|
||||
})}
|
||||
},
|
||||
decode(buf, cur) {
|
||||
return {
|
||||
${jit.map(", ", keys, (k) => {
|
||||
return jit`[${jit.literal(k)}]: ${shape[k]}.decode(buf, cur)`;
|
||||
return jit`[${k}]: ${shape[k]}.decode(buf, cur)`;
|
||||
})}
|
||||
};
|
||||
},
|
||||
|
||||
386
sql.ts
386
sql.ts
@@ -1,386 +0,0 @@
|
||||
import { from_hex, to_hex } from "./lstd.ts";
|
||||
|
||||
export const sql_format = Symbol.for(`re.lua.pglue.sql_format`);
|
||||
|
||||
export interface SqlFragment {
|
||||
[sql_format](f: SqlFormatter): void;
|
||||
}
|
||||
|
||||
export function is_sql(x: unknown): x is SqlFragment {
|
||||
return typeof x === "object" && x !== null && sql_format in x;
|
||||
}
|
||||
|
||||
export interface FromSql {
|
||||
(x: SqlValue): unknown;
|
||||
}
|
||||
|
||||
export interface ToSql {
|
||||
(x: unknown): SqlFragment;
|
||||
}
|
||||
|
||||
export const from_sql = function from_sql(x) {
|
||||
const { type, value } = x;
|
||||
if (value === null) return null;
|
||||
|
||||
switch (type) {
|
||||
case 16: // boolean
|
||||
return boolean.parse(value);
|
||||
case 25: // text
|
||||
return text.parse(value);
|
||||
case 21: // int2
|
||||
return int2.parse(value);
|
||||
case 23: // int4
|
||||
return int4.parse(value);
|
||||
case 20: // int8
|
||||
case 26: // oid
|
||||
return int8.parse(value);
|
||||
case 700: // float4
|
||||
return float4.parse(value);
|
||||
case 701: // float8
|
||||
return float8.parse(value);
|
||||
case 1082: // date
|
||||
case 1114: // timestamp
|
||||
case 1184: // timestamptz
|
||||
return timestamptz.parse(value);
|
||||
case 17: // bytea
|
||||
return bytea.parse(value);
|
||||
case 114: // json
|
||||
case 3802: // jsonb
|
||||
return json.parse(value);
|
||||
default:
|
||||
return x;
|
||||
}
|
||||
} as FromSql;
|
||||
|
||||
export const to_sql = function to_sql(x) {
|
||||
switch (typeof x) {
|
||||
case "undefined":
|
||||
return nil();
|
||||
case "boolean":
|
||||
return boolean(x);
|
||||
case "number":
|
||||
return float8(x);
|
||||
case "bigint":
|
||||
return int8(x);
|
||||
case "string":
|
||||
case "symbol":
|
||||
case "function":
|
||||
return text(x);
|
||||
}
|
||||
|
||||
switch (true) {
|
||||
case x === null:
|
||||
return nil();
|
||||
|
||||
case is_sql(x):
|
||||
return x;
|
||||
|
||||
case Array.isArray(x):
|
||||
return array(...(x instanceof Array ? x : Array.from(x)));
|
||||
|
||||
case x instanceof Date:
|
||||
return timestamptz(x);
|
||||
|
||||
case x instanceof Uint8Array:
|
||||
case x instanceof ArrayBuffer:
|
||||
case x instanceof SharedArrayBuffer:
|
||||
return bytea(x);
|
||||
}
|
||||
|
||||
throw new TypeError(`cannot convert input '${x}' to sql`);
|
||||
} as ToSql;
|
||||
|
||||
export class SqlValue implements SqlFragment {
|
||||
constructor(
|
||||
readonly type: number,
|
||||
readonly value: string | null
|
||||
) {}
|
||||
|
||||
[sql_format](f: SqlFormatter) {
|
||||
f.write_param(this.type, this.value);
|
||||
}
|
||||
|
||||
[Symbol.toStringTag]() {
|
||||
return `${this.constructor.name}<${this.type}>`;
|
||||
}
|
||||
|
||||
[Symbol.toPrimitive]() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
toString() {
|
||||
return String(this.value);
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return this.value;
|
||||
}
|
||||
}
|
||||
|
||||
export function value(type: number, x: unknown) {
|
||||
const s = x === null || typeof x === "undefined" ? null : String(x);
|
||||
return new SqlValue(type, s);
|
||||
}
|
||||
|
||||
export class SqlFormatter {
|
||||
readonly #ser;
|
||||
#query = "";
|
||||
#params = {
|
||||
types: [] as number[],
|
||||
values: [] as (string | null)[],
|
||||
};
|
||||
|
||||
get query() {
|
||||
return this.#query.trim();
|
||||
}
|
||||
|
||||
get params() {
|
||||
return this.#params;
|
||||
}
|
||||
|
||||
constructor(serializer: ToSql) {
|
||||
this.#ser = serializer;
|
||||
}
|
||||
|
||||
write(s: string | SqlFragment) {
|
||||
if (is_sql(s)) s[sql_format](this);
|
||||
else this.#query += s;
|
||||
}
|
||||
|
||||
write_param(type: number, s: string | null) {
|
||||
const { types, values } = this.#params;
|
||||
types.push(type), values.push(s), this.write(`$` + values.length);
|
||||
}
|
||||
|
||||
format(x: unknown) {
|
||||
this.write(is_sql(x) ? x : this.#ser(x));
|
||||
}
|
||||
}
|
||||
|
||||
export function format(sql: SqlFragment, serializer = to_sql) {
|
||||
const fmt = new SqlFormatter(serializer);
|
||||
return fmt.write(sql), fmt;
|
||||
}
|
||||
|
||||
export function sql(
|
||||
{ raw: s }: TemplateStringsArray,
|
||||
...xs: unknown[]
|
||||
): SqlFragment {
|
||||
return {
|
||||
[sql_format](f) {
|
||||
for (let i = 0, n = s.length; i < n; i++) {
|
||||
if (i !== 0) f.format(xs[i - 1]);
|
||||
f.write(s[i]);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
sql.value = value;
|
||||
sql.format = format;
|
||||
sql.raw = raw;
|
||||
sql.ident = ident;
|
||||
sql.fragment = fragment;
|
||||
sql.map = map;
|
||||
sql.array = array;
|
||||
sql.row = row;
|
||||
sql.null = nil;
|
||||
sql.boolean = boolean;
|
||||
sql.text = text;
|
||||
sql.int2 = int2;
|
||||
sql.int4 = int4;
|
||||
sql.int8 = int8;
|
||||
sql.float4 = float4;
|
||||
sql.float8 = float8;
|
||||
sql.timestamptz = timestamptz;
|
||||
sql.bytea = bytea;
|
||||
sql.json = json;
|
||||
|
||||
export function raw(s: TemplateStringsArray, ...xs: unknown[]): SqlFragment;
|
||||
export function raw(s: string): SqlFragment;
|
||||
export function raw(
|
||||
s: TemplateStringsArray | string,
|
||||
...xs: unknown[]
|
||||
): SqlFragment {
|
||||
s = typeof s === "string" ? s : String.raw(s, ...xs);
|
||||
return {
|
||||
[sql_format](f) {
|
||||
f.write(s);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function ident(s: TemplateStringsArray, ...xs: unknown[]): SqlFragment;
|
||||
export function ident(s: string): SqlFragment;
|
||||
export function ident(s: TemplateStringsArray | string, ...xs: unknown[]) {
|
||||
s = typeof s === "string" ? s : String.raw(s, ...xs);
|
||||
return raw`"${s.replaceAll('"', '""')}"`;
|
||||
}
|
||||
|
||||
export function fragment(
|
||||
sep: string | SqlFragment,
|
||||
...xs: unknown[]
|
||||
): SqlFragment {
|
||||
return {
|
||||
[sql_format](f) {
|
||||
for (let i = 0, n = xs.length; i < n; i++) {
|
||||
if (i !== 0) f.write(sep);
|
||||
f.format(xs[i]);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function map<T>(
|
||||
sep: string | SqlFragment,
|
||||
xs: Iterable<T>,
|
||||
f: (value: T, index: number) => unknown
|
||||
): SqlFragment {
|
||||
return fragment(sep, ...Iterator.from(xs).map(f));
|
||||
}
|
||||
|
||||
export function array(...xs: unknown[]): SqlFragment {
|
||||
return sql`array[${fragment(", ", ...xs)}]`;
|
||||
}
|
||||
|
||||
export function row(...xs: unknown[]): SqlFragment {
|
||||
return sql`row(${fragment(", ", ...xs)})`;
|
||||
}
|
||||
|
||||
boolean.oid = 16 as const;
|
||||
text.oid = 25 as const;
|
||||
int2.oid = 21 as const;
|
||||
int4.oid = 23 as const;
|
||||
int8.oid = 20 as const;
|
||||
float4.oid = 700 as const;
|
||||
float8.oid = 701 as const;
|
||||
timestamptz.oid = 1184 as const;
|
||||
bytea.oid = 17 as const;
|
||||
json.oid = 114 as const;
|
||||
|
||||
export function nil() {
|
||||
return value(0, null);
|
||||
}
|
||||
|
||||
Object.defineProperty(nil, "name", { configurable: true, value: "null" });
|
||||
|
||||
export function boolean(x: unknown) {
|
||||
return value(
|
||||
boolean.oid,
|
||||
x === null || typeof x === "undefined" ? null : x ? "t" : "f"
|
||||
);
|
||||
}
|
||||
|
||||
boolean.parse = function parse_boolean(s: string) {
|
||||
return s === "t";
|
||||
};
|
||||
|
||||
export function text(x: unknown) {
|
||||
return value(text.oid, x);
|
||||
}
|
||||
|
||||
text.parse = function parse_text(s: string) {
|
||||
return s;
|
||||
};
|
||||
|
||||
const i2_min = -32768;
|
||||
const i2_max = 32767;
|
||||
|
||||
export function int2(x: unknown) {
|
||||
return value(int2.oid, x);
|
||||
}
|
||||
|
||||
int2.parse = function parse_int2(s: string) {
|
||||
const n = Number(s);
|
||||
if (Number.isInteger(n) && i2_min <= n && n <= i2_max) return n;
|
||||
else throw new TypeError(`input '${s}' is not a valid int2 value`);
|
||||
};
|
||||
|
||||
const i4_min = -2147483648;
|
||||
const i4_max = 2147483647;
|
||||
|
||||
export function int4(x: unknown) {
|
||||
return value(int4.oid, x);
|
||||
}
|
||||
|
||||
int4.parse = function parse_int4(s: string) {
|
||||
const n = Number(s);
|
||||
if (Number.isInteger(n) && i4_min <= n && n <= i4_max) return n;
|
||||
else throw new TypeError(`input '${s}' is not a valid int4 value`);
|
||||
};
|
||||
|
||||
const i8_min = -9223372036854775808n;
|
||||
const i8_max = 9223372036854775807n;
|
||||
|
||||
export function int8(x: unknown) {
|
||||
return value(int8.oid, x);
|
||||
}
|
||||
|
||||
function to_int8(n: number | bigint) {
|
||||
if (typeof n === "bigint") return i8_min <= n && n <= i8_max ? n : null;
|
||||
else return Number.isSafeInteger(n) ? BigInt(n) : null;
|
||||
}
|
||||
|
||||
int8.parse = function parse_int8(s: string) {
|
||||
const n = to_int8(BigInt(s));
|
||||
if (n !== null) return to_float8(n) ?? n;
|
||||
else throw new TypeError(`input '${s}' is not a valid int8 value`);
|
||||
};
|
||||
|
||||
const f8_min = -9007199254740991n;
|
||||
const f8_max = 9007199254740991n;
|
||||
|
||||
export function float4(x: unknown) {
|
||||
return value(float4.oid, x);
|
||||
}
|
||||
|
||||
export function float8(x: unknown) {
|
||||
return value(float8.oid, x);
|
||||
}
|
||||
|
||||
function to_float8(n: number | bigint) {
|
||||
if (typeof n === "bigint")
|
||||
return f8_min <= n && n <= f8_max ? Number(n) : null;
|
||||
else return Number.isNaN(n) ? null : n;
|
||||
}
|
||||
|
||||
float4.parse = float8.parse = function parse_float8(s: string) {
|
||||
const n = to_float8(Number(s));
|
||||
if (n !== null) return n;
|
||||
else throw new TypeError(`input '${s}' is not a valid float8 value`);
|
||||
};
|
||||
|
||||
export function timestamptz(x: unknown) {
|
||||
if (x instanceof Date) x = x.toISOString();
|
||||
else if (typeof x === "number" || typeof x === "bigint")
|
||||
x = new Date(Number(x) * 1000).toISOString(); // unix epoch
|
||||
return value(timestamptz.oid, x);
|
||||
}
|
||||
|
||||
timestamptz.parse = function parse_timestamptz(s: string) {
|
||||
const t = Date.parse(s);
|
||||
if (!Number.isNaN(t)) return new Date(t);
|
||||
else throw new TypeError(`input '${s}' is not a valid timestamptz value`);
|
||||
};
|
||||
|
||||
export function bytea(x: Uint8Array | ArrayBufferLike | Iterable<number>) {
|
||||
let buf;
|
||||
if (x instanceof Uint8Array) buf = x;
|
||||
else if (x instanceof ArrayBuffer || x instanceof SharedArrayBuffer)
|
||||
buf = new Uint8Array(x);
|
||||
else buf = Uint8Array.from(x);
|
||||
return value(bytea.oid, `\\x` + to_hex(buf));
|
||||
}
|
||||
|
||||
bytea.parse = function parse_bytea(s: string) {
|
||||
if (s.startsWith(`\\x`)) return from_hex(s.slice(2));
|
||||
else throw new TypeError(`input is not a valid bytea value`);
|
||||
};
|
||||
|
||||
export function json(x: unknown) {
|
||||
return value(json.oid, JSON.stringify(x) ?? null);
|
||||
}
|
||||
|
||||
json.parse = function parse_json(s: string): unknown {
|
||||
return JSON.parse(s);
|
||||
};
|
||||
209
test.ts
Normal file
209
test.ts
Normal file
@@ -0,0 +1,209 @@
|
||||
import pglue, { PostgresError, SqlTypeError } from "./mod.ts";
|
||||
import { expect } from "jsr:@std/expect";
|
||||
import { toText } from "jsr:@std/streams";
|
||||
|
||||
async function connect() {
|
||||
const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, {
|
||||
runtime_params: { client_min_messages: "INFO" },
|
||||
});
|
||||
|
||||
return pg.on("log", (_level, ctx, msg) => {
|
||||
console.info(`${msg}`, ctx);
|
||||
});
|
||||
}
|
||||
|
||||
Deno.test(`integers`, async () => {
|
||||
await using pg = await connect();
|
||||
await using _tx = await pg.begin();
|
||||
|
||||
const [{ a, b, c }] = await pg.query`
|
||||
select
|
||||
${"0x100"}::int2 as a,
|
||||
${777}::int4 as b,
|
||||
${{
|
||||
[Symbol.toPrimitive](hint: string) {
|
||||
expect(hint).toBe("number");
|
||||
return "1234";
|
||||
},
|
||||
}}::int8 as c
|
||||
`.first();
|
||||
|
||||
expect(a).toBe(0x100);
|
||||
expect(b).toBe(777);
|
||||
expect(c).toBe(1234);
|
||||
|
||||
const [{ large }] =
|
||||
await pg.query`select ${"10000000000000000"}::int8 as large`.first();
|
||||
|
||||
expect(large).toBe(10000000000000000n);
|
||||
|
||||
await expect(pg.query`select ${100000}::int2`).rejects.toThrow(SqlTypeError);
|
||||
await expect(pg.query`select ${"100000"}::text::int2`).rejects.toThrow(
|
||||
PostgresError
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test(`boolean`, async () => {
|
||||
await using pg = await connect();
|
||||
await using _tx = await pg.begin();
|
||||
|
||||
const [{ a, b, c }] = await pg.query`
|
||||
select
|
||||
${true}::bool as a,
|
||||
${"n"}::bool as b,
|
||||
${undefined}::bool as c
|
||||
`.first();
|
||||
|
||||
expect(a).toBe(true);
|
||||
expect(b).toBe(false);
|
||||
expect(c).toBe(null);
|
||||
});
|
||||
|
||||
Deno.test(`bytea`, async () => {
|
||||
await using pg = await connect();
|
||||
await using _tx = await pg.begin();
|
||||
|
||||
const [{ string, array, buffer }] = await pg.query`
|
||||
select
|
||||
${"hello, world"}::bytea as string,
|
||||
${[1, 2, 3, 4, 5]}::bytea as array,
|
||||
${Uint8Array.of(5, 4, 3, 2, 1)}::bytea as buffer
|
||||
`.first();
|
||||
|
||||
expect(string).toEqual(new TextEncoder().encode("hello, world"));
|
||||
expect(array).toEqual(Uint8Array.of(1, 2, 3, 4, 5));
|
||||
expect(buffer).toEqual(Uint8Array.of(5, 4, 3, 2, 1));
|
||||
});
|
||||
|
||||
Deno.test(`row`, async () => {
|
||||
await using pg = await connect();
|
||||
await using _tx = await pg.begin();
|
||||
|
||||
expect(
|
||||
(
|
||||
await pg.query`create table my_table (a text not null, b text not null, c text not null)`
|
||||
).tag
|
||||
).toBe(`CREATE TABLE`);
|
||||
|
||||
expect(
|
||||
(
|
||||
await pg.query`copy my_table from stdin`.stdin(
|
||||
`field a\tfield b\tfield c`
|
||||
)
|
||||
).tag
|
||||
).toBe(`COPY 1`);
|
||||
|
||||
const [row] = await pg.query`select * from my_table`.first();
|
||||
{
|
||||
// columns by name
|
||||
const { a, b, c } = row;
|
||||
expect(a).toBe("field a");
|
||||
expect(b).toBe("field b");
|
||||
expect(c).toBe("field c");
|
||||
}
|
||||
{
|
||||
// columns by index
|
||||
const [a, b, c] = row;
|
||||
expect(a).toBe("field a");
|
||||
expect(b).toBe("field b");
|
||||
expect(c).toBe("field c");
|
||||
}
|
||||
|
||||
const { readable, writable } = new TransformStream<Uint8Array>(
|
||||
{},
|
||||
new ByteLengthQueuingStrategy({ highWaterMark: 4096 }),
|
||||
new ByteLengthQueuingStrategy({ highWaterMark: 4096 })
|
||||
);
|
||||
await pg.query`copy my_table to stdout`.stdout(writable);
|
||||
expect(await toText(readable)).toBe(`field a\tfield b\tfield c\n`);
|
||||
});
|
||||
|
||||
Deno.test(`sql injection`, async () => {
|
||||
await using pg = await connect();
|
||||
await using _tx = await pg.begin();
|
||||
|
||||
const input = `injection'); drop table users; --`;
|
||||
|
||||
expect((await pg.query`create table users (name text not null)`).tag).toBe(
|
||||
`CREATE TABLE`
|
||||
);
|
||||
|
||||
expect((await pg.query`insert into users (name) values (${input})`).tag).toBe(
|
||||
`INSERT 0 1`
|
||||
);
|
||||
|
||||
const [{ name }] = await pg.query<{ name: string }>`
|
||||
select name from users
|
||||
`.first();
|
||||
|
||||
expect(name).toBe(input);
|
||||
});
|
||||
|
||||
Deno.test(`pubsub`, async () => {
|
||||
await using pg = await connect();
|
||||
const sent: string[] = [];
|
||||
|
||||
await using ch = await pg.listen(`my channel`, (payload) => {
|
||||
expect(payload).toBe(sent.shift());
|
||||
});
|
||||
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const payload = `test payload ${i}`;
|
||||
sent.push(payload);
|
||||
await ch.notify(payload);
|
||||
}
|
||||
});
|
||||
|
||||
Deno.test(`transactions`, async () => {
|
||||
await using pg = await connect();
|
||||
|
||||
await pg.begin(async (pg) => {
|
||||
await pg.begin(async (pg, tx) => {
|
||||
await pg.query`create table my_table (field text not null)`;
|
||||
await tx.rollback();
|
||||
});
|
||||
|
||||
await expect(pg.query`select * from my_table`).rejects.toThrow(
|
||||
PostgresError
|
||||
);
|
||||
});
|
||||
|
||||
await expect(pg.query`select * from my_table`).rejects.toThrow(PostgresError);
|
||||
|
||||
await pg.begin(async (pg) => {
|
||||
await pg.begin(async (pg, tx) => {
|
||||
await pg.begin(async (pg, tx) => {
|
||||
await pg.begin(async (pg) => {
|
||||
await pg.query`create table my_table (field text not null)`;
|
||||
});
|
||||
await tx.commit();
|
||||
});
|
||||
|
||||
expect(await pg.query`select * from my_table`.count()).toBe(0);
|
||||
await tx.rollback();
|
||||
});
|
||||
|
||||
await expect(pg.query`select * from my_table`).rejects.toThrow(
|
||||
PostgresError
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
Deno.test(`streaming`, async () => {
|
||||
await using pg = await connect();
|
||||
await using _tx = await pg.begin();
|
||||
|
||||
await pg.query`create table my_table (field text not null)`;
|
||||
|
||||
for (let i = 0; i < 100; i++) {
|
||||
await pg.query`insert into my_table (field) values (${i})`;
|
||||
}
|
||||
|
||||
let i = 0;
|
||||
for await (const chunk of pg.query`select * from my_table`.chunked(10)) {
|
||||
expect(chunk.length).toBe(10);
|
||||
for (const row of chunk) expect(row.field).toBe(`${i++}`);
|
||||
}
|
||||
|
||||
expect(i).toBe(100);
|
||||
});
|
||||
337
wire.ts
337
wire.ts
@@ -1,10 +1,12 @@
|
||||
import {
|
||||
type BinaryLike,
|
||||
buf_concat_fast,
|
||||
buf_eq,
|
||||
buf_xor,
|
||||
channel,
|
||||
from_base64,
|
||||
from_utf8,
|
||||
jit,
|
||||
semaphore,
|
||||
semaphore_fast,
|
||||
to_base64,
|
||||
@@ -14,8 +16,8 @@ import {
|
||||
import {
|
||||
array,
|
||||
byten,
|
||||
byten_lp,
|
||||
byten_rest,
|
||||
bytes_lp,
|
||||
bytes,
|
||||
char,
|
||||
cstring,
|
||||
type Encoder,
|
||||
@@ -25,25 +27,21 @@ import {
|
||||
oneof,
|
||||
ser_decode,
|
||||
ser_encode,
|
||||
u16,
|
||||
i16,
|
||||
i32,
|
||||
u8,
|
||||
sum_const_size,
|
||||
i8,
|
||||
type EncoderType,
|
||||
} from "./ser.ts";
|
||||
import {
|
||||
is_sql,
|
||||
sql,
|
||||
type FromSql,
|
||||
type SqlFragment,
|
||||
type ToSql,
|
||||
} from "./sql.ts";
|
||||
import {
|
||||
type CommandResult,
|
||||
is_sql,
|
||||
Query,
|
||||
type ResultStream,
|
||||
type Row,
|
||||
row_ctor,
|
||||
type RowConstructor,
|
||||
sql,
|
||||
type SqlFragment,
|
||||
type SqlTypeMap,
|
||||
text,
|
||||
} from "./query.ts";
|
||||
import { join } from "jsr:@std/path@^1.0.8";
|
||||
|
||||
@@ -119,11 +117,11 @@ function msg<T extends string, S extends ObjectShape>(
|
||||
shape: S
|
||||
): MessageEncoder<T, S> {
|
||||
const header_size = type !== "" ? 5 : 4;
|
||||
const ty = type !== "" ? oneof(char(u8), type) : null;
|
||||
const ty = type !== "" ? oneof(char(i8), type) : null;
|
||||
const fields = object(shape);
|
||||
|
||||
return {
|
||||
const_size: sum_const_size(header_size, fields.const_size),
|
||||
const_size: null,
|
||||
get type() {
|
||||
return type;
|
||||
},
|
||||
@@ -160,7 +158,7 @@ function msg_check_err(msg: Uint8Array) {
|
||||
|
||||
// https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS
|
||||
export const Header = object({
|
||||
type: char(u8),
|
||||
type: char(i8),
|
||||
length: i32,
|
||||
});
|
||||
|
||||
@@ -191,7 +189,7 @@ export const AuthenticationGSS = msg("R", {
|
||||
|
||||
export const AuthenticationGSSContinue = msg("R", {
|
||||
status: oneof(i32, 8 as const),
|
||||
data: byten_rest,
|
||||
data: bytes,
|
||||
});
|
||||
|
||||
export const AuthenticationSSPI = msg("R", {
|
||||
@@ -221,12 +219,12 @@ export const AuthenticationSASL = msg("R", {
|
||||
|
||||
export const AuthenticationSASLContinue = msg("R", {
|
||||
status: oneof(i32, 11 as const),
|
||||
data: byten_rest,
|
||||
data: bytes,
|
||||
});
|
||||
|
||||
export const AuthenticationSASLFinal = msg("R", {
|
||||
status: oneof(i32, 12 as const),
|
||||
data: byten_rest,
|
||||
data: bytes,
|
||||
});
|
||||
|
||||
export const BackendKeyData = msg("K", {
|
||||
@@ -237,9 +235,9 @@ export const BackendKeyData = msg("K", {
|
||||
export const Bind = msg("B", {
|
||||
portal: cstring,
|
||||
statement: cstring,
|
||||
param_formats: array(u16, u16),
|
||||
param_values: array(u16, byten_lp),
|
||||
column_formats: array(u16, u16),
|
||||
param_formats: array(i16, i16),
|
||||
param_values: array(i16, bytes_lp),
|
||||
column_formats: array(i16, i16),
|
||||
});
|
||||
|
||||
export const BindComplete = msg("2", {});
|
||||
@@ -251,43 +249,43 @@ export const CancelRequest = msg("", {
|
||||
});
|
||||
|
||||
export const Close = msg("C", {
|
||||
which: oneof(char(u8), "S" as const, "P" as const),
|
||||
which: oneof(char(i8), "S" as const, "P" as const),
|
||||
name: cstring,
|
||||
});
|
||||
|
||||
export const CloseComplete = msg("3", {});
|
||||
export const CommandComplete = msg("C", { tag: cstring });
|
||||
export const CopyData = msg("d", { data: byten_rest });
|
||||
export const CopyData = msg("d", { data: bytes });
|
||||
export const CopyDone = msg("c", {});
|
||||
export const CopyFail = msg("f", { cause: cstring });
|
||||
|
||||
export const CopyInResponse = msg("G", {
|
||||
format: u8,
|
||||
column_formats: array(u16, u16),
|
||||
format: i8,
|
||||
column_formats: array(i16, i16),
|
||||
});
|
||||
|
||||
export const CopyOutResponse = msg("H", {
|
||||
format: u8,
|
||||
column_formats: array(u16, u16),
|
||||
format: i8,
|
||||
column_formats: array(i16, i16),
|
||||
});
|
||||
|
||||
export const CopyBothResponse = msg("W", {
|
||||
format: u8,
|
||||
column_formats: array(u16, u16),
|
||||
format: i8,
|
||||
column_formats: array(i16, i16),
|
||||
});
|
||||
|
||||
export const DataRow = msg("D", {
|
||||
column_values: array(u16, byten_lp),
|
||||
column_values: array(i16, bytes_lp),
|
||||
});
|
||||
|
||||
export const Describe = msg("D", {
|
||||
which: oneof(char(u8), "S" as const, "P" as const),
|
||||
which: oneof(char(i8), "S" as const, "P" as const),
|
||||
name: cstring,
|
||||
});
|
||||
|
||||
export const EmptyQueryResponse = msg("I", {});
|
||||
|
||||
const err_field = char(u8);
|
||||
const err_field = char(i8);
|
||||
const err_fields: Encoder<Record<string, string>> = {
|
||||
const_size: null,
|
||||
allocs(x) {
|
||||
@@ -325,13 +323,13 @@ export const Flush = msg("H", {});
|
||||
|
||||
export const FunctionCall = msg("F", {
|
||||
oid: i32,
|
||||
arg_formats: array(u16, u16),
|
||||
arg_values: array(u16, byten_lp),
|
||||
result_format: u16,
|
||||
arg_formats: array(i16, i16),
|
||||
arg_values: array(i16, bytes_lp),
|
||||
result_format: i16,
|
||||
});
|
||||
|
||||
export const FunctionCallResponse = msg("V", {
|
||||
result_value: byten_lp,
|
||||
result_value: bytes_lp,
|
||||
});
|
||||
|
||||
export const NegotiateProtocolVersion = msg("v", {
|
||||
@@ -352,7 +350,7 @@ export const NotificationResponse = msg("A", {
|
||||
});
|
||||
|
||||
export const ParameterDescription = msg("t", {
|
||||
param_types: array(u16, i32),
|
||||
param_types: array(i16, i32),
|
||||
});
|
||||
|
||||
export const ParameterStatus = msg("S", {
|
||||
@@ -363,7 +361,7 @@ export const ParameterStatus = msg("S", {
|
||||
export const Parse = msg("P", {
|
||||
statement: cstring,
|
||||
query: cstring,
|
||||
param_types: array(u16, i32),
|
||||
param_types: array(i16, i32),
|
||||
});
|
||||
|
||||
export const ParseComplete = msg("1", {});
|
||||
@@ -379,31 +377,31 @@ export const QueryMessage = msg("Q", {
|
||||
});
|
||||
|
||||
export const ReadyForQuery = msg("Z", {
|
||||
tx_status: oneof(char(u8), "I" as const, "T" as const, "E" as const),
|
||||
tx_status: oneof(char(i8), "I" as const, "T" as const, "E" as const),
|
||||
});
|
||||
|
||||
export const RowDescription = msg("T", {
|
||||
columns: array(
|
||||
u16,
|
||||
i16,
|
||||
object({
|
||||
name: cstring,
|
||||
table_oid: i32,
|
||||
table_column: u16,
|
||||
table_column: i16,
|
||||
type_oid: i32,
|
||||
type_size: u16,
|
||||
type_size: i16,
|
||||
type_modifier: i32,
|
||||
format: u16,
|
||||
format: i16,
|
||||
})
|
||||
),
|
||||
});
|
||||
|
||||
export const SASLInitialResponse = msg("p", {
|
||||
mechanism: cstring,
|
||||
data: byten_lp,
|
||||
data: bytes_lp,
|
||||
});
|
||||
|
||||
export const SASLResponse = msg("p", {
|
||||
data: byten_rest,
|
||||
data: bytes,
|
||||
});
|
||||
|
||||
export const StartupMessage = msg("", {
|
||||
@@ -421,7 +419,7 @@ export const StartupMessage = msg("", {
|
||||
for (const { 0: key, 1: value } of Object.entries(x)) {
|
||||
cstring.encode(buf, cur, key), cstring.encode(buf, cur, value);
|
||||
}
|
||||
u8.encode(buf, cur, 0);
|
||||
i8.encode(buf, cur, 0);
|
||||
},
|
||||
decode(buf, cur) {
|
||||
const x: Record<string, string> = {};
|
||||
@@ -447,8 +445,7 @@ export interface WireOptions {
|
||||
readonly password: string;
|
||||
readonly database: string | null;
|
||||
readonly runtime_params: Record<string, string>;
|
||||
readonly from_sql: FromSql;
|
||||
readonly to_sql: ToSql;
|
||||
readonly types: SqlTypeMap;
|
||||
}
|
||||
|
||||
export type WireEvents = {
|
||||
@@ -531,8 +528,8 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
|
||||
(this.#connected = this.#auth()).catch(close);
|
||||
}
|
||||
|
||||
query(sql: SqlFragment): Query;
|
||||
query(s: TemplateStringsArray, ...xs: unknown[]): Query;
|
||||
query<T = Row>(sql: SqlFragment): Query<T>;
|
||||
query<T = Row>(s: TemplateStringsArray, ...xs: unknown[]): Query<T>;
|
||||
query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) {
|
||||
return this.#query(is_sql(s) ? s : sql(s, ...xs));
|
||||
}
|
||||
@@ -543,7 +540,8 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
|
||||
if (typeof f !== "undefined") {
|
||||
await using tx = await this.#begin();
|
||||
const value = await f(this, tx);
|
||||
return await tx.commit(), value;
|
||||
if (tx.open) await tx.commit();
|
||||
return value;
|
||||
} else {
|
||||
return this.#begin();
|
||||
}
|
||||
@@ -559,9 +557,9 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
|
||||
return this.#notify(channel, payload);
|
||||
}
|
||||
|
||||
async get(param: string, missing_null = true) {
|
||||
async get(param: string) {
|
||||
return (
|
||||
await this.query`select current_setting(${param}, ${missing_null})`
|
||||
await this.query`select current_setting(${param}, true)`
|
||||
.map(([s]) => String(s))
|
||||
.first_or(null)
|
||||
)[0];
|
||||
@@ -581,10 +579,15 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
|
||||
}
|
||||
}
|
||||
|
||||
const msg_PD = object({ P: Parse, D: Describe });
|
||||
const msg_BE = object({ B: Bind, E: Execute });
|
||||
const msg_BEc = object({ B: Bind, E: Execute, c: CopyDone });
|
||||
const msg_BEcC = object({ B: Bind, E: Execute, c: CopyDone, C: Close });
|
||||
|
||||
function wire_impl(
|
||||
wire: Wire,
|
||||
socket: Deno.Conn,
|
||||
{ user, database, password, runtime_params, from_sql, to_sql }: WireOptions
|
||||
{ user, database, password, runtime_params, types }: WireOptions
|
||||
) {
|
||||
const params: Parameters = Object.create(null);
|
||||
|
||||
@@ -611,6 +614,7 @@ function wire_impl(
|
||||
}
|
||||
|
||||
const read_recv = channel.receiver<Uint8Array>(async function read(send) {
|
||||
let err: unknown;
|
||||
try {
|
||||
let buf = new Uint8Array();
|
||||
for await (const chunk of read_socket()) {
|
||||
@@ -658,9 +662,10 @@ function wire_impl(
|
||||
}
|
||||
|
||||
if (buf.length !== 0) throw new WireError(`unexpected end of stream`);
|
||||
wire.emit("close");
|
||||
} catch (e) {
|
||||
wire.emit("close", e);
|
||||
throw ((err = e), e);
|
||||
} finally {
|
||||
wire.emit("close", err);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -692,23 +697,31 @@ function wire_impl(
|
||||
}
|
||||
|
||||
function pipeline_read<T>(r: () => T | PromiseLike<T>) {
|
||||
return rlock(async () => {
|
||||
return rlock(async function pipeline_read() {
|
||||
try {
|
||||
return await r();
|
||||
} finally {
|
||||
let msg;
|
||||
while (msg_type((msg = await read_raw())) !== ReadyForQuery.type);
|
||||
({ tx_status } = ser_decode(ReadyForQuery, msg));
|
||||
try {
|
||||
let msg;
|
||||
while (msg_type((msg = await read_raw())) !== ReadyForQuery.type);
|
||||
({ tx_status } = ser_decode(ReadyForQuery, msg));
|
||||
} catch {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function pipeline_write<T>(w: () => T | PromiseLike<T>) {
|
||||
return wlock(async () => {
|
||||
return wlock(async function pipeline_write() {
|
||||
try {
|
||||
return await w();
|
||||
} finally {
|
||||
await write(Sync, {});
|
||||
try {
|
||||
await write(Sync, {});
|
||||
} catch {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -879,45 +892,43 @@ function wire_impl(
|
||||
const st_cache = new Map<string, Statement>();
|
||||
let st_ids = 0;
|
||||
|
||||
function st_get(query: string, param_types: number[]) {
|
||||
const key = JSON.stringify({ q: query, p: param_types });
|
||||
let st = st_cache.get(key);
|
||||
if (!st) st_cache.set(key, (st = new Statement(query, param_types)));
|
||||
return st;
|
||||
}
|
||||
|
||||
class Statement {
|
||||
readonly name = `__st${st_ids++}`;
|
||||
constructor(readonly query: string) {}
|
||||
|
||||
constructor(
|
||||
readonly query: string,
|
||||
readonly param_types: number[]
|
||||
) {}
|
||||
parse_task: Promise<{
|
||||
ser_params: ParameterSerializer;
|
||||
Row: RowConstructor;
|
||||
}> | null = null;
|
||||
|
||||
parse_task: Promise<RowConstructor> | null = null;
|
||||
parse() {
|
||||
return (this.parse_task ??= this.#parse());
|
||||
}
|
||||
|
||||
async #parse() {
|
||||
try {
|
||||
const { name, query, param_types } = this;
|
||||
return row_ctor(
|
||||
from_sql,
|
||||
await pipeline(
|
||||
async () => {
|
||||
await write(Parse, { statement: name, query, param_types });
|
||||
await write(Describe, { which: "S", name });
|
||||
},
|
||||
async () => {
|
||||
await read(ParseComplete);
|
||||
await read(ParameterDescription);
|
||||
const { name, query } = this;
|
||||
return await pipeline(
|
||||
() =>
|
||||
write(msg_PD, {
|
||||
P: { statement: name, query, param_types: [] },
|
||||
D: { which: "S", name },
|
||||
}),
|
||||
async () => {
|
||||
await read(ParseComplete);
|
||||
const param_desc = await read(ParameterDescription);
|
||||
|
||||
const msg = msg_check_err(await read_raw());
|
||||
if (msg_type(msg) === NoData.type) return [];
|
||||
else return ser_decode(RowDescription, msg).columns;
|
||||
}
|
||||
)
|
||||
const msg = msg_check_err(await read_raw());
|
||||
const row_desc =
|
||||
msg_type(msg) === NoData.type
|
||||
? { columns: [] }
|
||||
: ser_decode(RowDescription, msg);
|
||||
|
||||
return {
|
||||
ser_params: make_param_ser(param_desc),
|
||||
Row: make_row_ctor(row_desc),
|
||||
};
|
||||
}
|
||||
);
|
||||
} catch (e) {
|
||||
throw ((this.parse_task = null), e);
|
||||
@@ -930,6 +941,59 @@ function wire_impl(
|
||||
}
|
||||
}
|
||||
|
||||
type ParameterDescription = EncoderType<typeof ParameterDescription>;
|
||||
interface ParameterSerializer {
|
||||
(params: unknown[]): (string | null)[];
|
||||
}
|
||||
|
||||
function make_param_ser({ param_types }: ParameterDescription) {
|
||||
return jit.compiled<ParameterSerializer>`function ser_params(xs) {
|
||||
return [
|
||||
${jit.map(", ", param_types, (type_oid, i) => {
|
||||
const type = types[type_oid] ?? text;
|
||||
return jit`${type}.output(xs[${i}])`;
|
||||
})}
|
||||
];
|
||||
}`;
|
||||
}
|
||||
|
||||
type RowDescription = EncoderType<typeof RowDescription>;
|
||||
interface RowConstructor {
|
||||
new (columns: (BinaryLike | null)[]): Row;
|
||||
}
|
||||
|
||||
function make_row_ctor({ columns }: RowDescription) {
|
||||
const Row = jit.compiled<RowConstructor>`function Row(xs) {
|
||||
${jit.map(" ", columns, ({ name, type_oid }, i) => {
|
||||
const type = types[type_oid] ?? text;
|
||||
return jit`this[${name}] = xs[${i}] === null ? null : ${type}.input(${from_utf8}(xs[${i}]));`;
|
||||
})}
|
||||
}`;
|
||||
|
||||
Row.prototype = Object.create(null, {
|
||||
[Symbol.toStringTag]: {
|
||||
configurable: true,
|
||||
value: `Row`,
|
||||
},
|
||||
[Symbol.toPrimitive]: {
|
||||
configurable: true,
|
||||
value: function format() {
|
||||
return [...this].join("\t");
|
||||
},
|
||||
},
|
||||
[Symbol.iterator]: {
|
||||
configurable: true,
|
||||
value: jit.compiled`function* iter() {
|
||||
${jit.map(" ", columns, ({ name }) => {
|
||||
return jit`yield this[${name}];`;
|
||||
})}
|
||||
}`,
|
||||
},
|
||||
});
|
||||
|
||||
return Row;
|
||||
}
|
||||
|
||||
async function read_rows(
|
||||
Row: RowConstructor,
|
||||
stdout: WritableStream<Uint8Array> | null
|
||||
@@ -971,6 +1035,10 @@ function wire_impl(
|
||||
const { data } = ser_decode(CopyData, msg_check_err(msg));
|
||||
await writer.write(to_utf8(data));
|
||||
}
|
||||
await writer.close();
|
||||
} catch (e) {
|
||||
await writer.abort(e);
|
||||
throw e;
|
||||
} finally {
|
||||
writer.releaseLock();
|
||||
}
|
||||
@@ -982,17 +1050,13 @@ function wire_impl(
|
||||
async function write_copy_in(stream: ReadableStream<Uint8Array> | null) {
|
||||
if (stream !== null) {
|
||||
const reader = stream.getReader();
|
||||
let err;
|
||||
try {
|
||||
try {
|
||||
for (let next; !(next = await reader.read()).done; )
|
||||
await write(CopyData, { data: next.value });
|
||||
} catch (e) {
|
||||
err = e;
|
||||
} finally {
|
||||
if (typeof err === "undefined") await write(CopyDone, {});
|
||||
else await write(CopyFail, { cause: String(err) });
|
||||
}
|
||||
for (let next; !(next = await reader.read()).done; )
|
||||
await write(CopyData, { data: next.value });
|
||||
await write(CopyDone, {});
|
||||
} catch (e) {
|
||||
await write(CopyFail, { cause: String(e) });
|
||||
throw e;
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
@@ -1003,7 +1067,7 @@ function wire_impl(
|
||||
|
||||
async function* execute_fast(
|
||||
st: Statement,
|
||||
params: { types: number[]; values: (string | null)[] },
|
||||
params: unknown[],
|
||||
stdin: ReadableStream<Uint8Array> | null,
|
||||
stdout: WritableStream<Uint8Array> | null
|
||||
): ResultStream<Row> {
|
||||
@@ -1013,22 +1077,30 @@ function wire_impl(
|
||||
`executing query`
|
||||
);
|
||||
|
||||
const Row = await st.parse();
|
||||
const { ser_params, Row } = await st.parse();
|
||||
const param_values = ser_params(params);
|
||||
const portal = st.portal();
|
||||
|
||||
try {
|
||||
const { rows, tag } = await pipeline(
|
||||
async () => {
|
||||
await write(Bind, {
|
||||
const B = {
|
||||
portal,
|
||||
statement: st.name,
|
||||
param_formats: [],
|
||||
param_values: params.values,
|
||||
param_values,
|
||||
column_formats: [],
|
||||
});
|
||||
await write(Execute, { portal, row_limit: 0 });
|
||||
await write_copy_in(stdin);
|
||||
await write(Close, { which: "P" as const, name: portal });
|
||||
};
|
||||
const E = { portal, row_limit: 0 };
|
||||
const C = { which: "P" as const, name: portal };
|
||||
|
||||
if (stdin !== null) {
|
||||
await write(msg_BE, { B, E });
|
||||
await write_copy_in(stdin);
|
||||
await write(Close, C);
|
||||
} else {
|
||||
return write(msg_BEcC, { B, E, c: {}, C });
|
||||
}
|
||||
},
|
||||
async () => {
|
||||
await read(BindComplete);
|
||||
@@ -1039,10 +1111,14 @@ function wire_impl(
|
||||
if (rows.length) yield rows;
|
||||
return { tag };
|
||||
} catch (e) {
|
||||
await pipeline(
|
||||
() => write(Close, { which: "P" as const, name: portal }),
|
||||
() => read(CloseComplete)
|
||||
);
|
||||
try {
|
||||
await pipeline(
|
||||
() => write(Close, { which: "P" as const, name: portal }),
|
||||
() => read(CloseComplete)
|
||||
);
|
||||
} catch {
|
||||
// ignored
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
@@ -1050,7 +1126,7 @@ function wire_impl(
|
||||
|
||||
async function* execute_chunked(
|
||||
st: Statement,
|
||||
params: { types: number[]; values: (string | null)[] },
|
||||
params: unknown[],
|
||||
chunk_size: number,
|
||||
stdin: ReadableStream<Uint8Array> | null,
|
||||
stdout: WritableStream<Uint8Array> | null
|
||||
@@ -1061,21 +1137,28 @@ function wire_impl(
|
||||
`executing chunked query`
|
||||
);
|
||||
|
||||
const Row = await st.parse();
|
||||
const { ser_params, Row } = await st.parse();
|
||||
const param_values = ser_params(params);
|
||||
const portal = st.portal();
|
||||
|
||||
try {
|
||||
let { done, rows, tag } = await pipeline(
|
||||
async () => {
|
||||
await write(Bind, {
|
||||
const B = {
|
||||
portal,
|
||||
statement: st.name,
|
||||
param_formats: [],
|
||||
param_values: params.values,
|
||||
param_values,
|
||||
column_formats: [],
|
||||
});
|
||||
await write(Execute, { portal, row_limit: chunk_size });
|
||||
await write_copy_in(stdin);
|
||||
};
|
||||
const E = { portal, row_limit: chunk_size };
|
||||
|
||||
if (stdin !== null) {
|
||||
await write(msg_BE, { B, E });
|
||||
await write_copy_in(stdin);
|
||||
} else {
|
||||
return write(msg_BEc, { B, E, c: {} });
|
||||
}
|
||||
},
|
||||
async () => {
|
||||
await read(BindComplete);
|
||||
@@ -1104,8 +1187,9 @@ function wire_impl(
|
||||
}
|
||||
|
||||
function query(s: SqlFragment) {
|
||||
const { query, params } = sql.format(s, to_sql);
|
||||
const st = st_get(query, params.types);
|
||||
const { query, params } = sql.format(s);
|
||||
let st = st_cache.get(query);
|
||||
if (!st) st_cache.set(query, (st = new Statement(query)));
|
||||
|
||||
return new Query(({ chunk_size = 0, stdin = null, stdout = null }) =>
|
||||
chunk_size !== 0
|
||||
@@ -1269,8 +1353,8 @@ export class Pool
|
||||
}
|
||||
}
|
||||
|
||||
query(sql: SqlFragment): Query;
|
||||
query(s: TemplateStringsArray, ...xs: unknown[]): Query;
|
||||
query<T = Row>(sql: SqlFragment): Query<T>;
|
||||
query<T = Row>(s: TemplateStringsArray, ...xs: unknown[]): Query<T>;
|
||||
query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) {
|
||||
s = is_sql(s) ? s : sql(s, ...xs);
|
||||
const acquire = this.#acquire;
|
||||
@@ -1288,7 +1372,8 @@ export class Pool
|
||||
if (typeof f !== "undefined") {
|
||||
await using tx = await this.#begin();
|
||||
const value = await f(tx.wire, tx);
|
||||
return await tx.commit(), value;
|
||||
if (tx.open) await tx.commit();
|
||||
return value;
|
||||
} else {
|
||||
return this.#begin();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user