lstd/async.ts
2025-01-07 02:58:12 +11:00

205 lines
4.4 KiB
TypeScript

function noop() {}
export interface Deferred<T = void> extends Promise<T> {
resolve(value: T | PromiseLike<T>): void;
reject(reason?: unknown): void;
}
export function deferred<T = void>() {
const { promise, resolve, reject } = Promise.withResolvers<T>();
const p = promise as Deferred<T>;
return (p.resolve = resolve), (p.reject = reject), p;
}
export function notifier<T = void>() {
let p: Deferred<T> | null = null;
return {
listen(): Promise<T> {
return (p ??= deferred());
},
notify(value: T) {
p &&= (p.resolve(value), null);
},
};
}
export interface Channel<T> {
send: Sender<T>;
recv: Receiver<T>;
}
export interface Sender<T> {
(value: T): void;
close(reason?: unknown): void;
}
export interface Receiver<T> {
(): Promise<T | null> | T | null;
try(): T | null;
close(reason?: unknown): void;
}
export function channel<T = void>(): Channel<T> {
const q = new Map<number, Entry>();
let head = 0;
let tail = 0;
let open = true;
let err: unknown = null;
type Entry =
| { has: true; value: T }
| { has: false; value: Deferred<T | null> };
function send(value: T) {
if (open) {
const i = head++;
let n = q.get(i);
if (!n) q.set(i, (n = { has: true, value }));
else if (!n.has) q.delete(i), n.value.resolve(value);
else n.value = value;
} else {
if (err !== null) throw err;
else return;
}
}
function recv() {
if (open) {
const i = tail++;
let n = q.get(i);
if (!n) return q.set(i, (n = { has: false, value: deferred() })), n.value;
else if (!n.has) return n.value;
else return q.delete(i), n.value;
} else {
if (err !== null) throw err;
else return null;
}
}
function try_recv() {
if (open) {
const n = q.get(tail);
if (n?.has) return q.delete(tail++), n.value;
else return null;
} else {
return null;
}
}
recv.try = try_recv;
send.close = recv.close = function close(reason?: unknown) {
if (!open) return;
else (open = false), (err = reason ?? null);
for (const p of q.values()) {
if (p.has) continue;
else if (err !== null) p.value.reject(err);
else p.value.resolve(null);
}
q.clear();
};
return { send, recv };
}
channel.sender = function sender<T>(
f: (recv: Receiver<T>) => void | PromiseLike<void>
): Sender<T> {
const { send, recv } = channel<T>();
Promise.resolve(f(recv)).then(noop).then(recv.close, recv.close);
return send;
};
channel.receiver = function receiver<T>(
f: (send: Sender<T>) => void | PromiseLike<void>
): Receiver<T> {
const { send, recv } = channel<T>();
Promise.resolve(f(send)).then(noop).then(send.close, send.close);
return recv;
};
export function semaphore(count = 1) {
const { send: signal, recv: wait } = channel();
let n = count;
async function acquire() {
if (--n < 0) await wait();
return release;
}
function release() {
if (n++ < 0) signal();
}
function reset(new_count = count) {
n = count = new_count;
}
acquire.release = release;
acquire.reset = reset;
release[Symbol.dispose] = release;
release satisfies Disposable;
return acquire;
}
export interface PoolToken<T> extends Iterable<T, void, void> {
readonly value: T;
release(): void;
}
export function pool<T>(size: number, f: () => T | PromiseLike<T>) {
const lock = semaphore(size);
const all = new Set<T>();
const free: T[] = [];
const Token = class implements PoolToken<T> {
#value;
#open = true;
get value() {
if (this.#open) return this.#value;
else throw new TypeError(`borrowed value is already released`);
}
constructor(value: T) {
this.#value = value;
}
*[Symbol.iterator]() {
yield this.value;
}
release() {
this.#open &&=
(lock.release(), all.has(this.#value) && free.push(this.#value), false);
}
[Symbol.dispose]() {
this.release();
}
};
async function acquire(): Promise<PoolToken<T>> {
await lock();
try {
let value;
if (free.length !== 0) value = free.pop() as T;
else value = await f();
return all.add(value), new Token(value);
} catch (e) {
throw (lock.release(), e);
}
}
function forget(value: T) {
if (all.delete(value)) {
const idx = free.indexOf(value);
if (idx !== -1) free.splice(idx, 1);
}
}
acquire.forget = forget;
return acquire;
}