Ensure stdout writable stream is always closed

This commit is contained in:
luaneko 2025-01-11 00:28:14 +11:00
parent 4e68e34fd0
commit cefe14b9dc
Signed by: luaneko
GPG Key ID: 406809B8763FF07A
4 changed files with 28 additions and 11 deletions

View File

@ -1,5 +1,5 @@
{ {
"name": "@luaneko/pglue", "name": "@luaneko/pglue",
"version": "0.1.0", "version": "0.1.1",
"exports": "./mod.ts" "exports": "./mod.ts"
} }

8
deno.lock generated
View File

@ -3,11 +3,13 @@
"specifiers": { "specifiers": {
"jsr:@badrap/valita@~0.4.2": "0.4.2", "jsr:@badrap/valita@~0.4.2": "0.4.2",
"jsr:@std/assert@^1.0.10": "1.0.10", "jsr:@std/assert@^1.0.10": "1.0.10",
"jsr:@std/bytes@^1.0.3": "1.0.4",
"jsr:@std/bytes@^1.0.4": "1.0.4", "jsr:@std/bytes@^1.0.4": "1.0.4",
"jsr:@std/encoding@^1.0.6": "1.0.6", "jsr:@std/encoding@^1.0.6": "1.0.6",
"jsr:@std/expect@*": "1.0.11", "jsr:@std/expect@*": "1.0.11",
"jsr:@std/internal@^1.0.5": "1.0.5", "jsr:@std/internal@^1.0.5": "1.0.5",
"jsr:@std/path@^1.0.8": "1.0.8", "jsr:@std/path@^1.0.8": "1.0.8",
"jsr:@std/streams@*": "1.0.8",
"npm:pg-connection-string@^2.7.0": "2.7.0" "npm:pg-connection-string@^2.7.0": "2.7.0"
}, },
"jsr": { "jsr": {
@ -38,6 +40,12 @@
}, },
"@std/path@1.0.8": { "@std/path@1.0.8": {
"integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be" "integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be"
},
"@std/streams@1.0.8": {
"integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3",
"dependencies": [
"jsr:@std/bytes@^1.0.3"
]
} }
}, },
"npm": { "npm": {

View File

@ -1,5 +1,6 @@
import pglue, { PostgresError, SqlTypeError } from "./mod.ts"; import pglue, { PostgresError, SqlTypeError } from "./mod.ts";
import { expect } from "jsr:@std/expect"; import { expect } from "jsr:@std/expect";
import { toText } from "jsr:@std/streams";
async function connect() { async function connect() {
const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, { const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, {
@ -107,6 +108,14 @@ Deno.test(`row`, async () => {
expect(b).toBe("field b"); expect(b).toBe("field b");
expect(c).toBe("field c"); expect(c).toBe("field c");
} }
const { readable, writable } = new TransformStream<Uint8Array>(
{},
new ByteLengthQueuingStrategy({ highWaterMark: 4096 }),
new ByteLengthQueuingStrategy({ highWaterMark: 4096 })
);
await pg.query`copy my_table to stdout`.stdout(writable);
expect(await toText(readable)).toBe(`field a\tfield b\tfield c\n`);
}); });
Deno.test(`sql injection`, async () => { Deno.test(`sql injection`, async () => {

14
wire.ts
View File

@ -1029,6 +1029,10 @@ function wire_impl(
const { data } = ser_decode(CopyData, msg_check_err(msg)); const { data } = ser_decode(CopyData, msg_check_err(msg));
await writer.write(to_utf8(data)); await writer.write(to_utf8(data));
} }
await writer.close();
} catch (e) {
await writer.abort(e);
throw e;
} finally { } finally {
writer.releaseLock(); writer.releaseLock();
} }
@ -1040,17 +1044,13 @@ function wire_impl(
async function write_copy_in(stream: ReadableStream<Uint8Array> | null) { async function write_copy_in(stream: ReadableStream<Uint8Array> | null) {
if (stream !== null) { if (stream !== null) {
const reader = stream.getReader(); const reader = stream.getReader();
let err;
try {
try { try {
for (let next; !(next = await reader.read()).done; ) for (let next; !(next = await reader.read()).done; )
await write(CopyData, { data: next.value }); await write(CopyData, { data: next.value });
await write(CopyDone, {});
} catch (e) { } catch (e) {
err = e; await write(CopyFail, { cause: String(e) });
} finally { throw e;
if (typeof err === "undefined") await write(CopyDone, {});
else await write(CopyFail, { cause: String(err) });
}
} finally { } finally {
reader.releaseLock(); reader.releaseLock();
} }