diff --git a/query.ts b/query.ts index 1a9fadf..e0bde7f 100644 --- a/query.ts +++ b/query.ts @@ -345,6 +345,7 @@ export interface Row extends Iterable { } export interface QueryOptions { + readonly simple: boolean; readonly chunk_size: number; readonly stdin: ReadableStream | null; readonly stdout: WritableStream | null; @@ -359,6 +360,11 @@ export class Query this.#f = f; } + simple(simple = true) { + const f = this.#f; + return new Query((o) => f({ simple, ...o })); + } + chunked(chunk_size = 1) { const f = this.#f; return new Query((o) => f({ chunk_size, ...o })); diff --git a/test.ts b/test.ts index 6dff5e3..8fa6fb0 100644 --- a/test.ts +++ b/test.ts @@ -2,9 +2,9 @@ import pglue, { PostgresError, SqlTypeError } from "./mod.ts"; import { expect } from "jsr:@std/expect"; import { toText } from "jsr:@std/streams"; -async function connect() { +async function connect(params?: Record) { const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, { - runtime_params: { client_min_messages: "INFO" }, + runtime_params: { client_min_messages: "INFO", ...params }, }); return pg.on("log", (_level, ctx, msg) => { @@ -139,7 +139,7 @@ Deno.test(`sql injection`, async () => { expect(name).toBe(input); }); -Deno.test(`pubsub`, async () => { +Deno.test(`listen/notify`, async () => { await using pg = await connect(); const sent: string[] = []; @@ -152,6 +152,8 @@ Deno.test(`pubsub`, async () => { sent.push(payload); await ch.notify(payload); } + + expect(sent.length).toBe(0); }); Deno.test(`transactions`, async () => { @@ -195,15 +197,35 @@ Deno.test(`streaming`, async () => { await pg.query`create table my_table (field text not null)`; - for (let i = 0; i < 100; i++) { + for (let i = 0; i < 20; i++) { await pg.query`insert into my_table (field) values (${i})`; } let i = 0; - for await (const chunk of pg.query`select * from my_table`.chunked(10)) { - expect(chunk.length).toBe(10); + for await (const chunk of pg.query`select * from my_table`.chunked(5)) { + expect(chunk.length).toBe(5); for (const row of chunk) expect(row.field).toBe(`${i++}`); } - expect(i).toBe(100); + expect(i).toBe(20); +}); + +Deno.test(`simple`, async () => { + await using pg = await connect(); + await using _tx = await pg.begin(); + + const rows = await pg.query` + create table my_table (field text not null); + insert into my_table (field) values ('one'), ('two'), ('three'); + select * from my_table; + select * from my_table where field = 'two'; + `.simple(); + + expect(rows.length).toBe(4); + + const [{ field: a }, { field: b }, { field: c }, { field: d }] = rows; + expect(a).toBe("one"); + expect(b).toBe("two"); + expect(c).toBe("three"); + expect(d).toBe("two"); }); diff --git a/wire.ts b/wire.ts index 9f3f4ff..9467bd4 100644 --- a/wire.ts +++ b/wire.ts @@ -34,6 +34,7 @@ import { } from "./ser.ts"; import { type CommandResult, + format, is_sql, Query, type ResultStream, @@ -916,18 +917,15 @@ function wire_impl( }), async () => { await read(ParseComplete); - const param_desc = await read(ParameterDescription); + const ser_params = make_param_ser(await read(ParameterDescription)); const msg = msg_check_err(await read_raw()); - const row_desc = + const Row = msg_type(msg) === NoData.type - ? { columns: [] } - : ser_decode(RowDescription, msg); + ? EmptyRow + : make_row_ctor(ser_decode(RowDescription, msg)); - return { - ser_params: make_param_ser(param_desc), - Row: make_row_ctor(row_desc), - }; + return { ser_params, Row }; } ); } catch (e) { @@ -962,6 +960,7 @@ function wire_impl( new (columns: (BinaryLike | null)[]): Row; } + const EmptyRow = make_row_ctor({ columns: [] }); function make_row_ctor({ columns }: RowDescription) { const Row = jit.compiled`function Row(xs) { ${jit.map(" ", columns, ({ name, type_oid }, i) => { @@ -1017,11 +1016,15 @@ function wire_impl( case EmptyQueryResponse.type: return { done: true as const, rows, tag: "" }; + case RowDescription.type: + Row = make_row_ctor(ser_decode(RowDescription, msg)); + continue; + case CopyInResponse.type: continue; case CopyOutResponse.type: - await read_copy_out(stdout); + await read_copy_out(stdout), (stdout = null); continue; } } @@ -1065,6 +1068,53 @@ function wire_impl( } } + async function* execute_simple( + query: string, + stdin: ReadableStream | null, + stdout: WritableStream | null + ): ResultStream { + log("debug", { query: query }, `executing simple query`); + + const { chunks, err } = await pipeline( + async () => { + await write(QueryMessage, { query }); + return write_copy_in(stdin); + }, + async () => { + for (let chunks = [], err; ; ) { + const msg = await read_raw(); + switch (msg_type(msg)) { + default: + case ReadyForQuery.type: + return { chunks, err }; + + case RowDescription.type: { + const Row = make_row_ctor(ser_decode(RowDescription, msg)); + const { rows } = await read_rows(Row, stdout); + chunks.push(rows); + stdout = null; + continue; + } + + case EmptyQueryResponse.type: + case CommandComplete.type: + continue; + + case ErrorResponse.type: { + const { fields } = ser_decode(ErrorResponse, msg); + err = new PostgresError(fields); + continue; + } + } + } + } + ); + + yield* chunks; + if (err) throw err; + return { tag: "" }; + } + async function* execute_fast( st: Statement, params: unknown[], @@ -1097,7 +1147,7 @@ function wire_impl( if (stdin !== null) { await write(msg_BE, { B, E }); await write_copy_in(stdin); - await write(Close, C); + return write(Close, C); } else { return write(msg_BEcC, { B, E, c: {}, C }); } @@ -1155,7 +1205,7 @@ function wire_impl( if (stdin !== null) { await write(msg_BE, { B, E }); - await write_copy_in(stdin); + return write_copy_in(stdin); } else { return write(msg_BEc, { B, E, c: {} }); } @@ -1186,15 +1236,20 @@ function wire_impl( } } - function query(s: SqlFragment) { - const { query, params } = sql.format(s); - let st = st_cache.get(query); - if (!st) st_cache.set(query, (st = new Statement(query))); + function query(sql: SqlFragment) { + return new Query( + ({ simple = false, chunk_size = 0, stdin = null, stdout = null }) => { + const { query, params } = format(sql); + if (simple) { + if (!params.length) return execute_simple(query, stdin, stdout); + else throw new WireError(`simple query cannot be parameterised`); + } - return new Query(({ chunk_size = 0, stdin = null, stdout = null }) => - chunk_size !== 0 - ? execute_chunked(st, params, chunk_size, stdin, stdout) - : execute_fast(st, params, stdin, stdout) + let st = st_cache.get(query); + if (!st) st_cache.set(query, (st = new Statement(query))); + if (!chunk_size) return execute_fast(st, params, stdin, stdout); + else return execute_chunked(st, params, chunk_size, stdin, stdout); + } ); }