From cefe14b9dcf5a0f633f3118e90d775d5dfa95501 Mon Sep 17 00:00:00 2001 From: luaneko Date: Sat, 11 Jan 2025 00:28:14 +1100 Subject: [PATCH] Ensure stdout writable stream is always closed --- deno.json | 2 +- deno.lock | 8 ++++++++ test.ts | 9 +++++++++ wire.ts | 20 ++++++++++---------- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/deno.json b/deno.json index 06e6a45..fb8ca00 100644 --- a/deno.json +++ b/deno.json @@ -1,5 +1,5 @@ { "name": "@luaneko/pglue", - "version": "0.1.0", + "version": "0.1.1", "exports": "./mod.ts" } diff --git a/deno.lock b/deno.lock index c1ccb77..60a5e31 100644 --- a/deno.lock +++ b/deno.lock @@ -3,11 +3,13 @@ "specifiers": { "jsr:@badrap/valita@~0.4.2": "0.4.2", "jsr:@std/assert@^1.0.10": "1.0.10", + "jsr:@std/bytes@^1.0.3": "1.0.4", "jsr:@std/bytes@^1.0.4": "1.0.4", "jsr:@std/encoding@^1.0.6": "1.0.6", "jsr:@std/expect@*": "1.0.11", "jsr:@std/internal@^1.0.5": "1.0.5", "jsr:@std/path@^1.0.8": "1.0.8", + "jsr:@std/streams@*": "1.0.8", "npm:pg-connection-string@^2.7.0": "2.7.0" }, "jsr": { @@ -38,6 +40,12 @@ }, "@std/path@1.0.8": { "integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be" + }, + "@std/streams@1.0.8": { + "integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3", + "dependencies": [ + "jsr:@std/bytes@^1.0.3" + ] } }, "npm": { diff --git a/test.ts b/test.ts index 5c6a36b..2f75455 100644 --- a/test.ts +++ b/test.ts @@ -1,5 +1,6 @@ import pglue, { PostgresError, SqlTypeError } from "./mod.ts"; import { expect } from "jsr:@std/expect"; +import { toText } from "jsr:@std/streams"; async function connect() { const pg = await pglue.connect(`postgres://test:test@localhost:5432/test`, { @@ -107,6 +108,14 @@ Deno.test(`row`, async () => { expect(b).toBe("field b"); expect(c).toBe("field c"); } + + const { readable, writable } = new TransformStream( + {}, + new ByteLengthQueuingStrategy({ highWaterMark: 4096 }), + new ByteLengthQueuingStrategy({ highWaterMark: 4096 }) + ); + await pg.query`copy my_table to stdout`.stdout(writable); + expect(await toText(readable)).toBe(`field a\tfield b\tfield c\n`); }); Deno.test(`sql injection`, async () => { diff --git a/wire.ts b/wire.ts index 6c93d90..762068b 100644 --- a/wire.ts +++ b/wire.ts @@ -1029,6 +1029,10 @@ function wire_impl( const { data } = ser_decode(CopyData, msg_check_err(msg)); await writer.write(to_utf8(data)); } + await writer.close(); + } catch (e) { + await writer.abort(e); + throw e; } finally { writer.releaseLock(); } @@ -1040,17 +1044,13 @@ function wire_impl( async function write_copy_in(stream: ReadableStream | null) { if (stream !== null) { const reader = stream.getReader(); - let err; try { - try { - for (let next; !(next = await reader.read()).done; ) - await write(CopyData, { data: next.value }); - } catch (e) { - err = e; - } finally { - if (typeof err === "undefined") await write(CopyDone, {}); - else await write(CopyFail, { cause: String(err) }); - } + for (let next; !(next = await reader.read()).done; ) + await write(CopyData, { data: next.value }); + await write(CopyDone, {}); + } catch (e) { + await write(CopyFail, { cause: String(e) }); + throw e; } finally { reader.releaseLock(); }