diff --git a/CHANGELOG.md b/CHANGELOG.md index bc37ffc586..68cc200dca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1 @@ -* Added UUID module -* Implemented `randomUUID`, `randomFill` and `randomFillSync` APIs in `crypto` module +* Much faster timers. Timers are also now more consistent with Node.js event loop behavior. \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 9b3f13ac7e..c55ae7b203 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ md-5 = { version = "0.10.6", features = ["asm"] } ring = "0.17.5" rquickjs = { git = "https://github.com/DelSkayn/rquickjs", rev = "e168c0a4a242f55723558268fd0269351c5293ae", features = [ "full-async", + "parallel", ], default-features = false } tokio = { version = "1", features = ["full"] } tracing = { version = "0.1.40", features = ["log"] } diff --git a/index.mjs b/index.mjs index e519ccf788..fe44d7f922 100644 --- a/index.mjs +++ b/index.mjs @@ -5,7 +5,6 @@ const client = new DynamoDBClient({}); const docClient = DynamoDBDocumentClient.from(client); export const handler = async (event) => { - const start = Date.now(); await docClient.send( new PutCommand({ TableName: process.env.TABLE_NAME, @@ -15,8 +14,6 @@ export const handler = async (event) => { }, }) ); - const end = Date.now(); - console.log(`Done in ${end - start}ms`); return { statusCode: 200, body: "OK", diff --git a/src/bytearray_buffer.rs b/src/bytearray_buffer.rs index dc0a2a6a15..c718dda4f3 100644 --- a/src/bytearray_buffer.rs +++ b/src/bytearray_buffer.rs @@ -37,6 +37,7 @@ impl BytearrayBuffer { self.len.load(Ordering::Relaxed) } + #[allow(dead_code)] pub fn write_forced(&self, item: &[u8]) { let mut inner = self.inner.lock().unwrap(); inner.extend(item); @@ -75,6 +76,7 @@ impl BytearrayBuffer { } } + #[allow(dead_code)] pub fn is_closed(&self) -> bool { self.closed.load(Ordering::Relaxed) } diff --git a/src/net/socket.rs b/src/net/socket.rs index d0192f0f97..aa9ce1c6be 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -126,6 +126,7 @@ pub struct Server<'js> { } #[rquickjs::class] +#[allow(dead_code)] pub struct Socket<'js> { emitter: EventEmitter<'js>, readable_stream_inner: ReadableStreamInner<'js>, @@ -542,6 +543,8 @@ impl<'js> Server<'js> { self.address.clone() } + #[allow(unused_assignments)] + ///TODO add backlog support pub fn listen( this: This>, ctx: Ctx<'js>, @@ -551,6 +554,7 @@ impl<'js> Server<'js> { let mut port = None; let mut path = None; let mut host = None; + #[allow(unused_variables)] //TODO add backlog support let mut backlog = None; let mut callback = None; diff --git a/src/stream/readable.rs b/src/stream/readable.rs index 9f499a8070..d64d2fda9e 100644 --- a/src/stream/readable.rs +++ b/src/stream/readable.rs @@ -32,6 +32,7 @@ pub enum ReadableState { Paused, } +#[allow(dead_code)] pub struct ReadableStreamInner<'js> { emitter: EventEmitter<'js>, destroy_tx: Sender>>, diff --git a/src/stream/writable.rs b/src/stream/writable.rs index da98d8877e..70ac784b41 100644 --- a/src/stream/writable.rs +++ b/src/stream/writable.rs @@ -29,6 +29,7 @@ pub struct WritableStreamInner<'js> { command_tx: UnboundedSender>, command_rx: Option>>, is_finished: bool, + #[allow(dead_code)] errored: bool, emit_close: bool, is_destroyed: bool, @@ -55,6 +56,7 @@ impl<'js> WritableStreamInner<'js> { } #[derive(Debug)] +#[allow(dead_code)] pub enum WriteCommand<'js> { End, Write(Vec, Option>, bool), diff --git a/src/timers.rs b/src/timers.rs index 144c437509..3f5738df73 100644 --- a/src/timers.rs +++ b/src/timers.rs @@ -1,102 +1,67 @@ use std::{ - sync::Arc, - time::{Duration, Instant}, + mem, + ptr::NonNull, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use rquickjs::{ - function::Opt, module::{Declarations, Exports, ModuleDef}, prelude::Func, - Class, Ctx, Function, Result, + qjs, Ctx, Exception, Function, Result, }; -use tokio::sync::Notify; use crate::{util::export_default, vm::CtxExtension}; -#[rquickjs::class] -#[derive(rquickjs::class::Trace)] -struct Timeout { - #[qjs(skip_trace)] - abort: Arc, +static TIMER_ID: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug)] +struct Timeout<'js> { + cb: Option>, + timeout: usize, + id: usize, + repeating: bool, + delay: usize, } -fn clear_timeout(_ctx: Ctx<'_>, timeout: Class) -> Result<()> { - timeout.borrow().abort.notify_one(); +fn set_immediate(cb: Function) -> Result<()> { + cb.defer::<()>(())?; Ok(()) } -async fn yield_sleep(duration: Duration) { - if duration.as_millis() == 0 { - tokio::task::yield_now().await; - return; - } - let start_time = Instant::now(); - while Instant::now() - start_time < duration { - tokio::task::yield_now().await; - } +fn get_current_time_millis() -> usize { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|t| t.as_millis() as usize) + .unwrap_or(0) } fn set_timeout_interval<'js>( - ctx: Ctx<'js>, - cb: Function<'js>, - msec: Option, - interval: bool, -) -> Result> { - let msec = msec.unwrap_or(0); - - let abort = Arc::new(Notify::new()); - let abort_ref = abort.clone(); - - ctx.spawn_exit(async move { - loop { - let abort = abort_ref.clone(); - - let aborted; - - tokio::select! { - _ = abort.notified() => { - aborted = true; - }, - _ = yield_sleep(Duration::from_millis(msec)) => { - aborted = false; - } - } - - if !aborted { - cb.call::<(), ()>(())?; - } - - if !interval || aborted { - break; - } - } - drop(cb); - drop(abort_ref); - Ok(()) - })?; - - Class::instance(ctx, Timeout { abort }) -} - -fn set_timeout<'js>( - ctx: Ctx<'js>, + timeouts: &Arc>>>, cb: Function<'js>, - msec: Opt, -) -> Result> { - set_timeout_interval(ctx, cb, msec.0, false) + delay: usize, + repeating: bool, +) -> usize { + let timeout = get_current_time_millis() + delay; + let id = TIMER_ID.fetch_add(1, Ordering::SeqCst); + timeouts.lock().unwrap().push(Timeout { + cb: Some(cb), + timeout, + id, + repeating, + delay, + }); + id } -fn set_interval<'js>( - ctx: Ctx<'js>, - cb: Function<'js>, - msec: Opt, -) -> Result> { - set_timeout_interval(ctx, cb, msec.0, true) -} - -fn set_immediate(cb: Function) -> Result<()> { - cb.defer::<()>(())?; - Ok(()) +fn clear_timeout_interval(timeouts: &Arc>>, id: usize) { + let mut timeouts = timeouts.lock().unwrap(); + if let Some(index) = timeouts.iter().position(|t| t.id == id) { + timeouts.remove(index); + } } pub struct TimersModule; @@ -130,12 +95,97 @@ impl ModuleDef for TimersModule { pub fn init(ctx: &Ctx<'_>) -> Result<()> { let globals = ctx.globals(); - Class::::register(ctx)?; + let timeouts = Arc::new(Mutex::new(Vec::::new())); + let timeouts2 = timeouts.clone(); + let timeouts3 = timeouts.clone(); + let timeouts4 = timeouts.clone(); + let timeouts5 = timeouts.clone(); + + globals.set( + "setTimeout", + Func::from(move |cb, delay| set_timeout_interval(&timeouts, cb, delay, false)), + )?; + + globals.set( + "setInterval", + Func::from(move |cb, delay| set_timeout_interval(&timeouts2, cb, delay, true)), + )?; + + globals.set( + "clearTimeout", + Func::from(move |id: usize| clear_timeout_interval(&timeouts3, id)), + )?; + + globals.set( + "clearInterval", + Func::from(move |id: usize| clear_timeout_interval(&timeouts4, id)), + )?; + + let ctx2 = ctx.clone(); + + ctx.spawn_exit(async move { + let raw_ctx = ctx2.as_raw(); + let rt: *mut qjs::JSRuntime = unsafe { qjs::JS_GetRuntime(raw_ctx.as_ptr()) }; + let mut ctx_ptr = mem::MaybeUninit::<*mut qjs::JSContext>::uninit(); + + let mut interval = tokio::time::interval(Duration::from_millis(1)); + let mut to_call = Some(Vec::new()); + loop { + interval.tick().await; + let mut call_vec = to_call.take().unwrap(); //avoid creating a new vec + let current_time = get_current_time_millis(); + let mut had_items = false; + + timeouts5.lock().unwrap().retain_mut(|timeout| { + had_items = true; + if current_time > timeout.timeout { + if !timeout.repeating { + //do not clone if not not repeating + call_vec.push(timeout.cb.take()); + return false; + } + timeout.timeout = current_time + timeout.delay; + call_vec.push(timeout.cb.clone()); + } + true + }); + + for cb in call_vec.iter_mut() { + if let Some(cb) = cb.take() { + cb.call::<(), ()>(())?; + }; + } + + call_vec.clear(); + to_call.replace(call_vec); + + let result = unsafe { qjs::JS_ExecutePendingJob(rt, ctx_ptr.as_mut_ptr()) }; + + if result < 0 { + let js_context = unsafe { ctx_ptr.assume_init() }; + let ctx_ptr = NonNull::new(js_context) + .expect("executing pending job returned a null context on error"); + + let ctx = unsafe { Ctx::from_raw(ctx_ptr) }; + + let err = ctx.catch(); + + if let Some(x) = err.clone().into_object().and_then(Exception::from_object) { + Err(x.throw())?; + } else { + Err(ctx.throw(err))?; + } + } + + if !had_items && result == 0 { + break; + } + } + timeouts5.lock().unwrap().clear(); + + Ok(()) + })?; - globals.set("setTimeout", Func::from(set_timeout))?; - globals.set("clearTimeout", Func::from(clear_timeout))?; - globals.set("setInterval", Func::from(set_interval))?; - globals.set("clearInterval", Func::from(clear_timeout))?; globals.set("setImmediate", Func::from(set_immediate))?; Ok(()) diff --git a/src/vm.rs b/src/vm.rs index e7fbc195db..6b79ce47de 100644 --- a/src/vm.rs +++ b/src/vm.rs @@ -405,7 +405,7 @@ impl Vm { pub async fn run_and_handle_exceptions<'js, F>(ctx: &AsyncContext, f: F) where - F: FnOnce(Ctx) -> rquickjs::Result<()>, + F: FnOnce(Ctx) -> rquickjs::Result<()> + Send, { ctx.with(|ctx| { f(ctx.clone()) @@ -452,7 +452,7 @@ impl Vm { } } -fn init(ctx: &Ctx, module_names: HashSet<&'static str>) -> Result<()> { +fn init(ctx: &Ctx<'_>, module_names: HashSet<&'static str>) -> Result<()> { let globals = ctx.globals(); globals.set("global", ctx.globals())?; diff --git a/tests/timers.test.ts b/tests/timers.test.ts index 19b8aa2b34..fd2202879d 100644 --- a/tests/timers.test.ts +++ b/tests/timers.test.ts @@ -10,6 +10,12 @@ describe("timers", () => { assert.ok(end - start >= 10); }); + it("should set nested timeout", (done) => { + setTimeout(() => { + setTimeout(done, 10); + }, 10); + }); + it("should clear timeout", async () => { const start = Date.now(); let status = "";