luby/crates/lb/src/task.rs

118 lines
3.7 KiB
Rust

//! # Task library
//!
//! The `lb:task` library primitives for asynchronous communication between tasks via message
//! passing channels.
//!
//! ## Exports
//!
//! See [`lb_tasklib`] for items exported by this library.
use crate::runtime::spawn;
use luaffi::{
cdef,
marker::{function, many},
metatype,
};
use std::{cell::RefCell, ffi::c_int, time::Duration};
use tokio::{task::JoinHandle, time::sleep};
/// Items exported by the `lb:task` library.
///
/// This library can be acquired by calling
/// [`require("lb:task")`](https://www.lua.org/manual/5.1/manual.html#pdf-require).
///
/// ```lua
/// local task = require("lb:task");
/// ```
#[cdef(module = "lb:task")]
pub struct lb_tasklib;
#[metatype]
#[include("task.lua")]
impl lb_tasklib {
#[new]
extern "Lua-C" fn new() -> Self {
Self
}
pub async extern "Lua-C" fn sleep(ms: f64) {
sleep(Duration::from_secs_f64(ms.max(0.) / 1000.)).await;
}
pub extern "Lua" fn spawn(f: function, ...) -> lb_task {
// pack the function and its arguments into a table and pass its ref to rust.
//
// this "state" table is used from rust-side to call the function with its args, and it's
// also reused to store its return values that the task handle can return when awaited. the
// ref is owned by the task handle and unref'ed when it's gc'ed.
assert(
r#type(f) == "function",
concat!("function expected in argument 'f', got ", r#type(f)),
);
// we need two refs: one for the spawn call, and the other for the task handle. this is to
// ensure the task handle isn't gc'ed and the state table unref'ed before the spawn callback
// runs and puts the state table on the stack.
let state = __tpack(f, variadic!());
Self::__spawn(__ref(state), __ref(state))
}
extern "Lua-C" fn __spawn(spawn_ref: c_int, handle_ref: c_int) -> lb_task {
let handle = spawn(async move |cx| {
// SAFETY: handle_ref is always unique, created in Self::spawn above.
let state = unsafe { luajit::Ref::from_raw(cx, spawn_ref) };
let mut s = cx.guard();
s.resize(0);
s.push(state); // this drops the state table ref, but the table is still on the stack
let narg = s.unpack(1, 1, None) - 1; // unpack the function and its args from the state table
match s.call_async(narg, None).await {
Ok(nret) => {
s.pack(1, nret); // pack the return values back into the state table
}
Err(err) => {
drop(s);
cx.report_error(&err);
}
}
});
// spawn_ref is owned by the task handle and unref'ed there when the handle gets gc'ed
lb_task::new(handle, handle_ref)
}
}
/// Handle for an asynchronous task created by [`spawn`](lb_tasklib::spawn).
#[cdef]
pub struct lb_task {
#[opaque]
handle: RefCell<Option<JoinHandle<()>>>,
__ref: c_int,
}
#[metatype]
impl lb_task {
fn new(handle: JoinHandle<()>, ref_key: c_int) -> Self {
lb_task {
handle: RefCell::new(Some(handle)),
__ref: ref_key,
}
}
pub async extern "Lua" fn r#await(&self) -> many {
self.__await();
let ret = __registry[self.__ref];
__tunpack(ret, 1, ret.n)
}
async extern "Lua-C" fn __await(&self) {
if let Some(handle) = self.handle.borrow_mut().take() {
handle
.await // task handler should never panic
.unwrap_or_else(|err| std::panic::resume_unwind(err.into_panic()));
}
}
#[gc]
extern "Lua" fn gc(&self) {
__unref(self.__ref);
}
}