Log query inside pipeline handler

This commit is contained in:
luaneko 2025-01-11 07:24:23 +11:00
parent 02f8098811
commit 29b2796627
Signed by: luaneko
GPG Key ID: 406809B8763FF07A

28
wire.ts
View File

@ -91,7 +91,7 @@ export class PostgresError extends WireError {
} }
} }
function severity_level(s: string): LogLevel { function severity_log_level(s: string): LogLevel {
switch (s) { switch (s) {
case "DEBUG": case "DEBUG":
return "debug"; return "debug";
@ -628,7 +628,7 @@ function wire_impl(
case NoticeResponse.type: { case NoticeResponse.type: {
const { fields } = ser_decode(NoticeResponse, msg); const { fields } = ser_decode(NoticeResponse, msg);
const notice = new PostgresError(fields); const notice = new PostgresError(fields);
log(severity_level(notice.severity), notice, notice.message); log(severity_log_level(notice.severity), notice, notice.message);
wire.emit("notice", notice); wire.emit("notice", notice);
return true; return true;
} }
@ -1101,10 +1101,12 @@ function wire_impl(
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): ResultStream<Row> {
log("debug", { query: query }, `executing simple query`);
yield* await pipeline( yield* await pipeline(
() => (write(QueryMessage, { query }), write_copy_in(stdin)), () => {
log("debug", { query }, `executing simple query`);
write(QueryMessage, { query });
write_copy_in(stdin);
},
async () => { async () => {
for (let chunks = [], err; ; ) { for (let chunks = [], err; ; ) {
const msg = await read_msg(); const msg = await read_msg();
@ -1145,12 +1147,7 @@ function wire_impl(
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): ResultStream<Row> {
log( const { query, name: statement } = st;
"debug",
{ query: st.query, statement: st.name, params },
`executing query`
);
const { ser_params, Row } = await st.parse(); const { ser_params, Row } = await st.parse();
const param_values = ser_params(params); const param_values = ser_params(params);
const portal = st.portal(); const portal = st.portal();
@ -1158,6 +1155,7 @@ function wire_impl(
try { try {
const { rows, tag } = await pipeline( const { rows, tag } = await pipeline(
async () => { async () => {
log("debug", { query, statement, params }, `executing query`);
write(Bind, { write(Bind, {
portal, portal,
statement: st.name, statement: st.name,
@ -1198,12 +1196,7 @@ function wire_impl(
stdin: ReadableStream<Uint8Array> | null, stdin: ReadableStream<Uint8Array> | null,
stdout: WritableStream<Uint8Array> | null stdout: WritableStream<Uint8Array> | null
): ResultStream<Row> { ): ResultStream<Row> {
log( const { query, name: statement } = st;
"debug",
{ query: st.query, statement: st.name, params },
`executing chunked query`
);
const { ser_params, Row } = await st.parse(); const { ser_params, Row } = await st.parse();
const param_values = ser_params(params); const param_values = ser_params(params);
const portal = st.portal(); const portal = st.portal();
@ -1211,6 +1204,7 @@ function wire_impl(
try { try {
let { done, rows, tag } = await pipeline( let { done, rows, tag } = await pipeline(
() => { () => {
log("debug", { query, statement, params }, `executing chunked query`);
write(Bind, { write(Bind, {
portal, portal,
statement: st.name, statement: st.name,