lstd/async.ts

154 lines
3.4 KiB
TypeScript
Raw Permalink Normal View History

2025-01-07 02:58:12 +11:00
function noop() {}
export interface Deferred<T = void> extends Promise<T> {
resolve(value: T | PromiseLike<T>): void;
reject(reason?: unknown): void;
}
export function deferred<T = void>() {
const { promise, resolve, reject } = Promise.withResolvers<T>();
const p = promise as Deferred<T>;
return (p.resolve = resolve), (p.reject = reject), p;
}
export function notifier<T = void>() {
let p: Deferred<T> | null = null;
return {
listen(): Promise<T> {
return (p ??= deferred());
},
notify(value: T) {
p &&= (p.resolve(value), null);
},
};
}
export interface Channel<T> {
send: Sender<T>;
recv: Receiver<T>;
}
export interface Sender<T> {
(value: T): void;
close(reason?: unknown): void;
}
export interface Receiver<T> {
(): Promise<T | null> | T | null;
try(): T | null;
close(reason?: unknown): void;
}
export function channel<T = void>(): Channel<T> {
const q = new Map<number, Entry>();
let head = 0;
let tail = 0;
let open = true;
let err: unknown = null;
type Entry =
| { has: true; value: T }
| { has: false; value: Deferred<T | null> };
function send(value: T) {
if (open) {
const i = head++;
let n = q.get(i);
if (!n) q.set(i, (n = { has: true, value }));
else if (!n.has) q.delete(i), n.value.resolve(value);
else n.value = value;
} else {
if (err !== null) throw err;
else return;
}
}
function recv() {
if (open) {
const i = tail++;
let n = q.get(i);
if (!n) return q.set(i, (n = { has: false, value: deferred() })), n.value;
else if (!n.has) return n.value;
else return q.delete(i), n.value;
} else {
if (err !== null) throw err;
else return null;
}
}
function try_recv() {
if (open) {
const n = q.get(tail);
if (n?.has) return q.delete(tail++), n.value;
else return null;
} else {
return null;
}
}
recv.try = try_recv;
send.close = recv.close = function close(reason?: unknown) {
if (!open) return;
else (open = false), (err = reason ?? null);
for (const p of q.values()) {
if (p.has) continue;
else if (err !== null) p.value.reject(err);
else p.value.resolve(null);
}
q.clear();
};
return { send, recv };
}
channel.sender = function sender<T>(
2025-01-07 21:46:23 +11:00
f: (recv: Receiver<T>) => void | PromiseLike<void>
2025-01-07 02:58:12 +11:00
): Sender<T> {
const { send, recv } = channel<T>();
Promise.resolve(f(recv)).then(noop).then(recv.close, recv.close);
return send;
};
channel.receiver = function receiver<T>(
2025-01-07 21:46:23 +11:00
f: (send: Sender<T>) => void | PromiseLike<void>
2025-01-07 02:58:12 +11:00
): Receiver<T> {
const { send, recv } = channel<T>();
Promise.resolve(f(send)).then(noop).then(send.close, send.close);
return recv;
};
export function semaphore(count = 1) {
const { send: signal, recv: wait } = channel();
let n = count;
async function acquire() {
if (--n < 0) await wait();
return release;
}
function release() {
if (n++ < 0) signal();
}
function reset(new_count = count) {
n = count = new_count;
}
acquire.release = release;
acquire.reset = reset;
release[Symbol.dispose] = release;
release satisfies Disposable;
return acquire;
}
2025-01-07 21:46:23 +11:00
export function semaphore_fast() {
let last = Promise.resolve<unknown>(undefined);
return function acquire<T>(f: () => T | PromiseLike<T>) {
return (last = last.then(f, f));
};
}