154 lines
3.4 KiB
TypeScript
154 lines
3.4 KiB
TypeScript
function noop() {}
|
|
|
|
export interface Deferred<T = void> extends Promise<T> {
|
|
resolve(value: T | PromiseLike<T>): void;
|
|
reject(reason?: unknown): void;
|
|
}
|
|
|
|
export function deferred<T = void>() {
|
|
const { promise, resolve, reject } = Promise.withResolvers<T>();
|
|
const p = promise as Deferred<T>;
|
|
return (p.resolve = resolve), (p.reject = reject), p;
|
|
}
|
|
|
|
export function notifier<T = void>() {
|
|
let p: Deferred<T> | null = null;
|
|
|
|
return {
|
|
listen(): Promise<T> {
|
|
return (p ??= deferred());
|
|
},
|
|
|
|
notify(value: T) {
|
|
p &&= (p.resolve(value), null);
|
|
},
|
|
};
|
|
}
|
|
|
|
export interface Channel<T> {
|
|
send: Sender<T>;
|
|
recv: Receiver<T>;
|
|
}
|
|
|
|
export interface Sender<T> {
|
|
(value: T): void;
|
|
close(reason?: unknown): void;
|
|
}
|
|
|
|
export interface Receiver<T> {
|
|
(): Promise<T | null> | T | null;
|
|
try(): T | null;
|
|
close(reason?: unknown): void;
|
|
}
|
|
|
|
export function channel<T = void>(): Channel<T> {
|
|
const q = new Map<number, Entry>();
|
|
let head = 0;
|
|
let tail = 0;
|
|
let open = true;
|
|
let err: unknown = null;
|
|
|
|
type Entry =
|
|
| { has: true; value: T }
|
|
| { has: false; value: Deferred<T | null> };
|
|
|
|
function send(value: T) {
|
|
if (open) {
|
|
const i = head++;
|
|
let n = q.get(i);
|
|
if (!n) q.set(i, (n = { has: true, value }));
|
|
else if (!n.has) q.delete(i), n.value.resolve(value);
|
|
else n.value = value;
|
|
} else {
|
|
if (err !== null) throw err;
|
|
else return;
|
|
}
|
|
}
|
|
|
|
function recv() {
|
|
if (open) {
|
|
const i = tail++;
|
|
let n = q.get(i);
|
|
if (!n) return q.set(i, (n = { has: false, value: deferred() })), n.value;
|
|
else if (!n.has) return n.value;
|
|
else return q.delete(i), n.value;
|
|
} else {
|
|
if (err !== null) throw err;
|
|
else return null;
|
|
}
|
|
}
|
|
|
|
function try_recv() {
|
|
if (open) {
|
|
const n = q.get(tail);
|
|
if (n?.has) return q.delete(tail++), n.value;
|
|
else return null;
|
|
} else {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
recv.try = try_recv;
|
|
send.close = recv.close = function close(reason?: unknown) {
|
|
if (!open) return;
|
|
else (open = false), (err = reason ?? null);
|
|
for (const p of q.values()) {
|
|
if (p.has) continue;
|
|
else if (err !== null) p.value.reject(err);
|
|
else p.value.resolve(null);
|
|
}
|
|
q.clear();
|
|
};
|
|
|
|
return { send, recv };
|
|
}
|
|
|
|
channel.sender = function sender<T>(
|
|
f: (recv: Receiver<T>) => void | PromiseLike<void>
|
|
): Sender<T> {
|
|
const { send, recv } = channel<T>();
|
|
Promise.resolve(f(recv)).then(noop).then(recv.close, recv.close);
|
|
return send;
|
|
};
|
|
|
|
channel.receiver = function receiver<T>(
|
|
f: (send: Sender<T>) => void | PromiseLike<void>
|
|
): Receiver<T> {
|
|
const { send, recv } = channel<T>();
|
|
Promise.resolve(f(send)).then(noop).then(send.close, send.close);
|
|
return recv;
|
|
};
|
|
|
|
export function semaphore(count = 1) {
|
|
const { send: signal, recv: wait } = channel();
|
|
let n = count;
|
|
|
|
async function acquire() {
|
|
if (--n < 0) await wait();
|
|
return release;
|
|
}
|
|
|
|
function release() {
|
|
if (n++ < 0) signal();
|
|
}
|
|
|
|
function reset(new_count = count) {
|
|
n = count = new_count;
|
|
}
|
|
|
|
acquire.release = release;
|
|
acquire.reset = reset;
|
|
release[Symbol.dispose] = release;
|
|
release satisfies Disposable;
|
|
|
|
return acquire;
|
|
}
|
|
|
|
export function semaphore_fast() {
|
|
let last = Promise.resolve<unknown>(undefined);
|
|
|
|
return function acquire<T>(f: () => T | PromiseLike<T>) {
|
|
return (last = last.then(f, f));
|
|
};
|
|
}
|