diff --git a/wire.ts b/wire.ts index 9467bd4..00d18c2 100644 --- a/wire.ts +++ b/wire.ts @@ -8,7 +8,6 @@ import { from_utf8, jit, semaphore, - semaphore_fast, to_base64, to_utf8, TypedEmitter, @@ -684,8 +683,8 @@ function wire_impl( } // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-PIPELINING - const rlock = semaphore_fast(); - const wlock = semaphore_fast(); + const rlock = semaphore(); + const wlock = semaphore(); function pipeline( w: () => void | PromiseLike, @@ -697,38 +696,39 @@ function wire_impl( }); } - function pipeline_read(r: () => T | PromiseLike) { - return rlock(async function pipeline_read() { + async function pipeline_read(r: () => T | PromiseLike) { + using _rlock = await rlock(); + try { + return await r(); + } finally { try { - return await r(); - } finally { - try { - let msg; - while (msg_type((msg = await read_raw())) !== ReadyForQuery.type); - ({ tx_status } = ser_decode(ReadyForQuery, msg)); - } catch { - // ignored - } + let msg; + while (msg_type((msg = await read_raw())) !== ReadyForQuery.type); + ({ tx_status } = ser_decode(ReadyForQuery, msg)); + } catch { + // ignored } - }); + } } - function pipeline_write(w: () => T | PromiseLike) { - return wlock(async function pipeline_write() { + async function pipeline_write(w: () => T | PromiseLike) { + using _wlock = await wlock(); + try { + return await w(); + } finally { try { - return await w(); - } finally { - try { - await write(Sync, {}); - } catch { - // ignored - } + await write(Sync, {}); + } catch { + // ignored } - }); + } } // https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-START-UP async function auth() { + using _rlock = await rlock(); + using _wlock = await wlock(); + await write(StartupMessage, { version: 196608, params: {