diff --git a/bench.ts b/bench.ts index 6a01888..31b83b2 100644 --- a/bench.ts +++ b/bench.ts @@ -1,4 +1,4 @@ -import * as pglue from "./mod.ts"; +import pglue from "./mod.ts"; import postgres_js from "https://deno.land/x/postgresjs/mod.js"; import * as deno_postgres from "https://deno.land/x/postgres/mod.ts"; diff --git a/deno.json b/deno.json index 22a49af..06e6a45 100644 --- a/deno.json +++ b/deno.json @@ -1,9 +1,5 @@ { "name": "@luaneko/pglue", "version": "0.1.0", - "exports": "./mod.ts", - "tasks": { - "test": "deno run --watch -A mod_test.ts", - "bench": "deno bench --watch -A" - } + "exports": "./mod.ts" } diff --git a/deno.lock b/deno.lock index 76a7720..c1ccb77 100644 --- a/deno.lock +++ b/deno.lock @@ -2,8 +2,11 @@ "version": "4", "specifiers": { "jsr:@badrap/valita@~0.4.2": "0.4.2", + "jsr:@std/assert@^1.0.10": "1.0.10", "jsr:@std/bytes@^1.0.4": "1.0.4", "jsr:@std/encoding@^1.0.6": "1.0.6", + "jsr:@std/expect@*": "1.0.11", + "jsr:@std/internal@^1.0.5": "1.0.5", "jsr:@std/path@^1.0.8": "1.0.8", "npm:pg-connection-string@^2.7.0": "2.7.0" }, @@ -11,12 +14,28 @@ "@badrap/valita@0.4.2": { "integrity": "af8a829e82eac71adbc7b60352798f94dcc66d19fab16b657957ca9e646c25fd" }, + "@std/assert@1.0.10": { + "integrity": "59b5cbac5bd55459a19045d95cc7c2ff787b4f8527c0dd195078ff6f9481fbb3", + "dependencies": [ + "jsr:@std/internal" + ] + }, "@std/bytes@1.0.4": { "integrity": "11a0debe522707c95c7b7ef89b478c13fb1583a7cfb9a85674cd2cc2e3a28abc" }, "@std/encoding@1.0.6": { "integrity": "ca87122c196e8831737d9547acf001766618e78cd8c33920776c7f5885546069" }, + "@std/expect@1.0.11": { + "integrity": "5aa5d5cf891e9a3249e45ea770de15189e5a2faee2122ee5746b10d1c310a19b", + "dependencies": [ + "jsr:@std/assert", + "jsr:@std/internal" + ] + }, + "@std/internal@1.0.5": { + "integrity": "54a546004f769c1ac9e025abd15a76b6671ddc9687e2313b67376125650dc7ba" + }, "@std/path@1.0.8": { "integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be" } diff --git a/mod_test.ts b/mod_test.ts deleted file mode 100644 index f55213e..0000000 --- a/mod_test.ts +++ /dev/null @@ -1,23 +0,0 @@ -import postgres from "./mod.ts"; - -await using pool = postgres(`postgres://test:test@localhost:5432/test`, { - runtime_params: { client_min_messages: "INFO" }, -}); - -pool.on("log", (level, ctx, msg) => console.info(`${level}: ${msg}`, ctx)); - -await pool.begin(async (pg, tx) => { - await pg.query` - create table my_test ( - key integer primary key generated always as identity, - data text not null - ) - `; - - await pg.query` - insert into my_test (data) values (${[1, 2, 3]}::bytea) - `; - - console.log(await pg.query`select * from my_test`); - await tx.rollback(); -}); diff --git a/query.ts b/query.ts index 1c7956b..1a9fadf 100644 --- a/query.ts +++ b/query.ts @@ -127,10 +127,36 @@ export const bool: SqlType = { return s !== "f"; }, output(x) { - return typeof x === "undefined" || x === null ? null : x ? "t" : "f"; + if (typeof x === "undefined" || x === null) return null; + const b = bool_names[String(x).toLowerCase()]; + if (typeof b === "boolean") return b ? "t" : "f"; + else throw new SqlTypeError(`invalid bool output '${x}'`); }, }; +const bool_names: Partial> = { + // https://www.postgresql.org/docs/current/datatype-boolean.html#DATATYPE-BOOLEAN + t: true, + tr: true, + tru: true, + true: true, + y: true, + ye: true, + yes: true, + on: true, + 1: true, + f: false, + fa: false, + fal: false, + fals: false, + false: false, + n: false, + no: false, + of: false, + off: false, + 0: false, +}; + export const text: SqlType = { input(s) { return s; @@ -401,7 +427,7 @@ export class Query async first(): Promise> { const { rows, tag } = await this.collect(1); - if (!rows.length) throw new Error(`expected one row, got none instead`); + if (!rows.length) throw new TypeError(`expected one row, got none instead`); const row = rows[0]; return Object.assign([row] as const, { row: rows[0], tag }); } @@ -417,8 +443,9 @@ export class Query let next; const rows = []; for (let i = 0; !(next = await iter.next()).done; ) { - const { value: c } = next; - for (let j = 0, n = c.length; i < count && j < n; ) rows[i++] = c[j++]; + const chunk = next.value; + for (let j = 0, n = chunk.length; i < count && j < n; ) + rows[i++] = chunk[j++]; } return Object.assign(rows, next.value, { rows }); } @@ -453,7 +480,8 @@ function str_to_stream(s: string) { return new ReadableStream({ type: "bytes", start(c) { - c.enqueue(to_utf8(s)), c.close(); + if (s.length !== 0) c.enqueue(to_utf8(s)); + c.close(); }, }); } diff --git a/test.ts b/test.ts new file mode 100644 index 0000000..5c6a36b --- /dev/null +++ b/test.ts @@ -0,0 +1,181 @@ +import pglue, { PostgresError, SqlTypeError } from "./mod.ts"; +import { expect } from "jsr:@std/expect"; + +async function connect() { + const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, { + runtime_params: { client_min_messages: "INFO" }, + }); + + return pg.on("log", (_level, ctx, msg) => { + console.info(`${msg}`, ctx); + }); +} + +Deno.test(`integers`, async () => { + await using pg = await connect(); + await using _tx = await pg.begin(); + + const [{ a, b, c }] = await pg.query` + select + ${"0x100"}::int2 as a, + ${777}::int4 as b, + ${{ + [Symbol.toPrimitive](hint: string) { + expect(hint).toBe("number"); + return "1234"; + }, + }}::int8 as c + `.first(); + + expect(a).toBe(0x100); + expect(b).toBe(777); + expect(c).toBe(1234); + + const [{ large }] = + await pg.query`select ${"10000000000000000"}::int8 as large`.first(); + + expect(large).toBe(10000000000000000n); + + await expect(pg.query`select ${100000}::int2`).rejects.toThrow(SqlTypeError); + await expect(pg.query`select ${"100000"}::text::int2`).rejects.toThrow( + PostgresError + ); +}); + +Deno.test(`boolean`, async () => { + await using pg = await connect(); + await using _tx = await pg.begin(); + + const [{ a, b, c }] = await pg.query` + select + ${true}::bool as a, + ${"n"}::bool as b, + ${undefined}::bool as c + `.first(); + + expect(a).toBe(true); + expect(b).toBe(false); + expect(c).toBe(null); +}); + +Deno.test(`bytea`, async () => { + await using pg = await connect(); + await using _tx = await pg.begin(); + + const [{ string, array, buffer }] = await pg.query` + select + ${"hello, world"}::bytea as string, + ${[1, 2, 3, 4, 5]}::bytea as array, + ${Uint8Array.of(5, 4, 3, 2, 1)}::bytea as buffer + `.first(); + + expect(string).toEqual(new TextEncoder().encode("hello, world")); + expect(array).toEqual(Uint8Array.of(1, 2, 3, 4, 5)); + expect(buffer).toEqual(Uint8Array.of(5, 4, 3, 2, 1)); +}); + +Deno.test(`row`, async () => { + await using pg = await connect(); + await using _tx = await pg.begin(); + + expect( + ( + await pg.query`create table my_table (a text not null, b text not null, c text not null)` + ).tag + ).toBe(`CREATE TABLE`); + + expect( + ( + await pg.query`copy my_table from stdin`.stdin( + `field a\tfield b\tfield c` + ) + ).tag + ).toBe(`COPY 1`); + + const [row] = await pg.query`select * from my_table`.first(); + { + // columns by name + const { a, b, c } = row; + expect(a).toBe("field a"); + expect(b).toBe("field b"); + expect(c).toBe("field c"); + } + { + // columns by index + const [a, b, c] = row; + expect(a).toBe("field a"); + expect(b).toBe("field b"); + expect(c).toBe("field c"); + } +}); + +Deno.test(`sql injection`, async () => { + await using pg = await connect(); + await using _tx = await pg.begin(); + + const input = `injection'); drop table users; --`; + + expect((await pg.query`create table users (name text not null)`).tag).toBe( + `CREATE TABLE` + ); + + expect((await pg.query`insert into users (name) values (${input})`).tag).toBe( + `INSERT 0 1` + ); + + const [{ name }] = await pg.query<{ name: string }>` + select name from users + `.first(); + + expect(name).toBe(input); +}); + +Deno.test(`pubsub`, async () => { + await using pg = await connect(); + const sent: string[] = []; + + await using ch = await pg.listen(`my channel`, (payload) => { + expect(payload).toBe(sent.shift()); + }); + + for (let i = 0; i < 5; i++) { + const payload = `test payload ${i}`; + sent.push(payload); + await ch.notify(payload); + } +}); + +Deno.test(`transactions`, async () => { + await using pg = await connect(); + + await pg.begin(async (pg) => { + await pg.begin(async (pg, tx) => { + await pg.query`create table my_table (field text not null)`; + await tx.rollback(); + }); + + await expect(pg.query`select * from my_table`).rejects.toThrow( + PostgresError + ); + }); + + await expect(pg.query`select * from my_table`).rejects.toThrow(PostgresError); + + await pg.begin(async (pg) => { + await pg.begin(async (pg, tx) => { + await pg.begin(async (pg, tx) => { + await pg.begin(async (pg) => { + await pg.query`create table my_table (field text not null)`; + }); + await tx.commit(); + }); + + expect(await pg.query`select * from my_table`.count()).toBe(0); + await tx.rollback(); + }); + + await expect(pg.query`select * from my_table`).rejects.toThrow( + PostgresError + ); + }); +}); diff --git a/wire.ts b/wire.ts index 5587dbb..6c93d90 100644 --- a/wire.ts +++ b/wire.ts @@ -528,8 +528,8 @@ export class Wire extends TypedEmitter implements Disposable { (this.#connected = this.#auth()).catch(close); } - query(sql: SqlFragment): Query; - query(s: TemplateStringsArray, ...xs: unknown[]): Query; + query(sql: SqlFragment): Query; + query(s: TemplateStringsArray, ...xs: unknown[]): Query; query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) { return this.#query(is_sql(s) ? s : sql(s, ...xs)); } @@ -557,9 +557,9 @@ export class Wire extends TypedEmitter implements Disposable { return this.#notify(channel, payload); } - async get(param: string, missing_null = true) { + async get(param: string) { return ( - await this.query`select current_setting(${param}, ${missing_null})` + await this.query`select current_setting(${param}, true)` .map(([s]) => String(s)) .first_or(null) )[0]; @@ -609,6 +609,7 @@ function wire_impl( } const read_recv = channel.receiver(async function read(send) { + let err: unknown; try { let buf = new Uint8Array(); for await (const chunk of read_socket()) { @@ -656,9 +657,10 @@ function wire_impl( } if (buf.length !== 0) throw new WireError(`unexpected end of stream`); - wire.emit("close"); } catch (e) { - wire.emit("close", e); + throw ((err = e), e); + } finally { + wire.emit("close", err); } }); @@ -690,23 +692,31 @@ function wire_impl( } function pipeline_read(r: () => T | PromiseLike) { - return rlock(async () => { + return rlock(async function pipeline_read() { try { return await r(); } finally { - let msg; - while (msg_type((msg = await read_raw())) !== ReadyForQuery.type); - ({ tx_status } = ser_decode(ReadyForQuery, msg)); + try { + let msg; + while (msg_type((msg = await read_raw())) !== ReadyForQuery.type); + ({ tx_status } = ser_decode(ReadyForQuery, msg)); + } catch { + // ignored + } } }); } function pipeline_write(w: () => T | PromiseLike) { - return wlock(async () => { + return wlock(async function pipeline_write() { try { return await w(); } finally { - await write(Sync, {}); + try { + await write(Sync, {}); + } catch { + // ignored + } } }); } @@ -1088,10 +1098,14 @@ function wire_impl( if (rows.length) yield rows; return { tag }; } catch (e) { - await pipeline( - () => write(Close, { which: "P" as const, name: portal }), - () => read(CloseComplete) - ); + try { + await pipeline( + () => write(Close, { which: "P" as const, name: portal }), + () => read(CloseComplete) + ); + } catch { + // ignored + } throw e; } @@ -1320,8 +1334,8 @@ export class Pool } } - query(sql: SqlFragment): Query; - query(s: TemplateStringsArray, ...xs: unknown[]): Query; + query(sql: SqlFragment): Query; + query(s: TemplateStringsArray, ...xs: unknown[]): Query; query(s: TemplateStringsArray | SqlFragment, ...xs: unknown[]) { s = is_sql(s) ? s : sql(s, ...xs); const acquire = this.#acquire;