Compare commits

...

2 Commits

Author SHA1 Message Date
b194397645
Optimise query request write 2025-01-11 00:46:17 +11:00
858b7a95f3
Add streaming query testing code 2025-01-11 00:39:00 +11:00
4 changed files with 54 additions and 16 deletions

View File

@ -14,9 +14,9 @@ The glue for TypeScript to PostgreSQL.
## Installation ## Installation
```ts ```ts
import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/0.1.1/mod.ts"; import pglue from "https://git.lua.re/luaneko/pglue/raw/tag/0.1.2/mod.ts";
// ...or from github: // ...or from github:
import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/0.1.1/mod.ts"; import pglue from "https://raw.githubusercontent.com/luaneko/pglue/refs/tags/0.1.2/mod.ts";
``` ```
## Documentation ## Documentation

View File

@ -1,5 +1,5 @@
{ {
"name": "@luaneko/pglue", "name": "@luaneko/pglue",
"version": "0.1.1", "version": "0.1.2",
"exports": "./mod.ts" "exports": "./mod.ts"
} }

19
test.ts
View File

@ -188,3 +188,22 @@ Deno.test(`transactions`, async () => {
); );
}); });
}); });
Deno.test(`streaming`, async () => {
await using pg = await connect();
await using _tx = await pg.begin();
await pg.query`create table my_table (field text not null)`;
for (let i = 0; i < 100; i++) {
await pg.query`insert into my_table (field) values (${i})`;
}
let i = 0;
for await (const chunk of pg.query`select * from my_table`.chunked(10)) {
expect(chunk.length).toBe(10);
for (const row of chunk) expect(row.field).toBe(`${i++}`);
}
expect(i).toBe(100);
});

45
wire.ts
View File

@ -579,6 +579,11 @@ export class Wire extends TypedEmitter<WireEvents> implements Disposable {
} }
} }
const msg_PD = object({ P: Parse, D: Describe });
const msg_BE = object({ B: Bind, E: Execute });
const msg_BEc = object({ B: Bind, E: Execute, c: CopyDone });
const msg_BEcC = object({ B: Bind, E: Execute, c: CopyDone, C: Close });
function wire_impl( function wire_impl(
wire: Wire, wire: Wire,
socket: Deno.Conn, socket: Deno.Conn,
@ -904,10 +909,11 @@ function wire_impl(
try { try {
const { name, query } = this; const { name, query } = this;
return await pipeline( return await pipeline(
async () => { () =>
await write(Parse, { statement: name, query, param_types: [] }); write(msg_PD, {
await write(Describe, { which: "S", name }); P: { statement: name, query, param_types: [] },
}, D: { which: "S", name },
}),
async () => { async () => {
await read(ParseComplete); await read(ParseComplete);
const param_desc = await read(ParameterDescription); const param_desc = await read(ParameterDescription);
@ -1078,16 +1084,23 @@ function wire_impl(
try { try {
const { rows, tag } = await pipeline( const { rows, tag } = await pipeline(
async () => { async () => {
await write(Bind, { const B = {
portal, portal,
statement: st.name, statement: st.name,
param_formats: [], param_formats: [],
param_values, param_values,
column_formats: [], column_formats: [],
}); };
await write(Execute, { portal, row_limit: 0 }); const E = { portal, row_limit: 0 };
await write_copy_in(stdin); const C = { which: "P" as const, name: portal };
await write(Close, { which: "P" as const, name: portal });
if (stdin !== null) {
await write(msg_BE, { B, E });
await write_copy_in(stdin);
await write(Close, C);
} else {
return write(msg_BEcC, { B, E, c: {}, C });
}
}, },
async () => { async () => {
await read(BindComplete); await read(BindComplete);
@ -1131,15 +1144,21 @@ function wire_impl(
try { try {
let { done, rows, tag } = await pipeline( let { done, rows, tag } = await pipeline(
async () => { async () => {
await write(Bind, { const B = {
portal, portal,
statement: st.name, statement: st.name,
param_formats: [], param_formats: [],
param_values, param_values,
column_formats: [], column_formats: [],
}); };
await write(Execute, { portal, row_limit: chunk_size }); const E = { portal, row_limit: chunk_size };
await write_copy_in(stdin);
if (stdin !== null) {
await write(msg_BE, { B, E });
await write_copy_in(stdin);
} else {
return write(msg_BEc, { B, E, c: {} });
}
}, },
async () => { async () => {
await read(BindComplete); await read(BindComplete);