Skip to content

Commit

Permalink
Fast timers (#18)
Browse files Browse the repository at this point in the history
* Fast timers & misc fixes
  • Loading branch information
richarddavison authored Nov 8, 2023
1 parent ab42971 commit f2d39ac
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 90 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
* Added UUID module
* Implemented `randomUUID`, `randomFill` and `randomFillSync` APIs in `crypto` module
* Much faster timers. Timers are also now more consistent with Node.js event loop behavior.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ md-5 = { version = "0.10.6", features = ["asm"] }
ring = "0.17.5"
rquickjs = { git = "https://github.com/DelSkayn/rquickjs", rev = "e168c0a4a242f55723558268fd0269351c5293ae", features = [
"full-async",
"parallel",
], default-features = false }
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1.40", features = ["log"] }
Expand Down
3 changes: 0 additions & 3 deletions index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);

export const handler = async (event) => {
const start = Date.now();
await docClient.send(
new PutCommand({
TableName: process.env.TABLE_NAME,
Expand All @@ -15,8 +14,6 @@ export const handler = async (event) => {
},
})
);
const end = Date.now();
console.log(`Done in ${end - start}ms`);
return {
statusCode: 200,
body: "OK",
Expand Down
2 changes: 2 additions & 0 deletions src/bytearray_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl BytearrayBuffer {
self.len.load(Ordering::Relaxed)
}

#[allow(dead_code)]
pub fn write_forced(&self, item: &[u8]) {
let mut inner = self.inner.lock().unwrap();
inner.extend(item);
Expand Down Expand Up @@ -75,6 +76,7 @@ impl BytearrayBuffer {
}
}

#[allow(dead_code)]
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
Expand Down
4 changes: 4 additions & 0 deletions src/net/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub struct Server<'js> {
}

#[rquickjs::class]
#[allow(dead_code)]
pub struct Socket<'js> {
emitter: EventEmitter<'js>,
readable_stream_inner: ReadableStreamInner<'js>,
Expand Down Expand Up @@ -542,6 +543,8 @@ impl<'js> Server<'js> {
self.address.clone()
}

#[allow(unused_assignments)]
///TODO add backlog support
pub fn listen(
this: This<Class<'js, Self>>,
ctx: Ctx<'js>,
Expand All @@ -551,6 +554,7 @@ impl<'js> Server<'js> {
let mut port = None;
let mut path = None;
let mut host = None;
#[allow(unused_variables)] //TODO add backlog support
let mut backlog = None;
let mut callback = None;

Expand Down
1 change: 1 addition & 0 deletions src/stream/readable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum ReadableState {
Paused,
}

#[allow(dead_code)]
pub struct ReadableStreamInner<'js> {
emitter: EventEmitter<'js>,
destroy_tx: Sender<Option<Value<'js>>>,
Expand Down
2 changes: 2 additions & 0 deletions src/stream/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct WritableStreamInner<'js> {
command_tx: UnboundedSender<WriteCommand<'js>>,
command_rx: Option<UnboundedReceiver<WriteCommand<'js>>>,
is_finished: bool,
#[allow(dead_code)]
errored: bool,
emit_close: bool,
is_destroyed: bool,
Expand All @@ -55,6 +56,7 @@ impl<'js> WritableStreamInner<'js> {
}

#[derive(Debug)]
#[allow(dead_code)]
pub enum WriteCommand<'js> {
End,
Write(Vec<u8>, Option<Function<'js>>, bool),
Expand Down
216 changes: 133 additions & 83 deletions src/timers.rs
Original file line number Diff line number Diff line change
@@ -1,102 +1,67 @@
use std::{
sync::Arc,
time::{Duration, Instant},
mem,
ptr::NonNull,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

use rquickjs::{
function::Opt,
module::{Declarations, Exports, ModuleDef},
prelude::Func,
Class, Ctx, Function, Result,
qjs, Ctx, Exception, Function, Result,
};
use tokio::sync::Notify;

use crate::{util::export_default, vm::CtxExtension};

#[rquickjs::class]
#[derive(rquickjs::class::Trace)]
struct Timeout {
#[qjs(skip_trace)]
abort: Arc<Notify>,
static TIMER_ID: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
struct Timeout<'js> {
cb: Option<Function<'js>>,
timeout: usize,
id: usize,
repeating: bool,
delay: usize,
}

fn clear_timeout(_ctx: Ctx<'_>, timeout: Class<Timeout>) -> Result<()> {
timeout.borrow().abort.notify_one();
fn set_immediate(cb: Function) -> Result<()> {
cb.defer::<()>(())?;
Ok(())
}

async fn yield_sleep(duration: Duration) {
if duration.as_millis() == 0 {
tokio::task::yield_now().await;
return;
}
let start_time = Instant::now();
while Instant::now() - start_time < duration {
tokio::task::yield_now().await;
}
fn get_current_time_millis() -> usize {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|t| t.as_millis() as usize)
.unwrap_or(0)
}

fn set_timeout_interval<'js>(
ctx: Ctx<'js>,
cb: Function<'js>,
msec: Option<u64>,
interval: bool,
) -> Result<Class<'js, Timeout>> {
let msec = msec.unwrap_or(0);

let abort = Arc::new(Notify::new());
let abort_ref = abort.clone();

ctx.spawn_exit(async move {
loop {
let abort = abort_ref.clone();

let aborted;

tokio::select! {
_ = abort.notified() => {
aborted = true;
},
_ = yield_sleep(Duration::from_millis(msec)) => {
aborted = false;
}
}

if !aborted {
cb.call::<(), ()>(())?;
}

if !interval || aborted {
break;
}
}
drop(cb);
drop(abort_ref);
Ok(())
})?;

Class::instance(ctx, Timeout { abort })
}

fn set_timeout<'js>(
ctx: Ctx<'js>,
timeouts: &Arc<Mutex<Vec<Timeout<'js>>>>,
cb: Function<'js>,
msec: Opt<u64>,
) -> Result<Class<'js, Timeout>> {
set_timeout_interval(ctx, cb, msec.0, false)
delay: usize,
repeating: bool,
) -> usize {
let timeout = get_current_time_millis() + delay;
let id = TIMER_ID.fetch_add(1, Ordering::SeqCst);
timeouts.lock().unwrap().push(Timeout {
cb: Some(cb),
timeout,
id,
repeating,
delay,
});
id
}

fn set_interval<'js>(
ctx: Ctx<'js>,
cb: Function<'js>,
msec: Opt<u64>,
) -> Result<Class<'js, Timeout>> {
set_timeout_interval(ctx, cb, msec.0, true)
}

fn set_immediate(cb: Function) -> Result<()> {
cb.defer::<()>(())?;
Ok(())
fn clear_timeout_interval(timeouts: &Arc<Mutex<Vec<Timeout>>>, id: usize) {
let mut timeouts = timeouts.lock().unwrap();
if let Some(index) = timeouts.iter().position(|t| t.id == id) {
timeouts.remove(index);
}
}

pub struct TimersModule;
Expand Down Expand Up @@ -130,12 +95,97 @@ impl ModuleDef for TimersModule {
pub fn init(ctx: &Ctx<'_>) -> Result<()> {
let globals = ctx.globals();

Class::<Timeout>::register(ctx)?;
let timeouts = Arc::new(Mutex::new(Vec::<Timeout>::new()));
let timeouts2 = timeouts.clone();
let timeouts3 = timeouts.clone();
let timeouts4 = timeouts.clone();
let timeouts5 = timeouts.clone();

globals.set(
"setTimeout",
Func::from(move |cb, delay| set_timeout_interval(&timeouts, cb, delay, false)),
)?;

globals.set(
"setInterval",
Func::from(move |cb, delay| set_timeout_interval(&timeouts2, cb, delay, true)),
)?;

globals.set(
"clearTimeout",
Func::from(move |id: usize| clear_timeout_interval(&timeouts3, id)),
)?;

globals.set(
"clearInterval",
Func::from(move |id: usize| clear_timeout_interval(&timeouts4, id)),
)?;

let ctx2 = ctx.clone();

ctx.spawn_exit(async move {
let raw_ctx = ctx2.as_raw();
let rt: *mut qjs::JSRuntime = unsafe { qjs::JS_GetRuntime(raw_ctx.as_ptr()) };
let mut ctx_ptr = mem::MaybeUninit::<*mut qjs::JSContext>::uninit();

let mut interval = tokio::time::interval(Duration::from_millis(1));
let mut to_call = Some(Vec::new());
loop {
interval.tick().await;
let mut call_vec = to_call.take().unwrap(); //avoid creating a new vec
let current_time = get_current_time_millis();
let mut had_items = false;

timeouts5.lock().unwrap().retain_mut(|timeout| {
had_items = true;
if current_time > timeout.timeout {
if !timeout.repeating {
//do not clone if not not repeating
call_vec.push(timeout.cb.take());
return false;
}
timeout.timeout = current_time + timeout.delay;
call_vec.push(timeout.cb.clone());
}
true
});

for cb in call_vec.iter_mut() {
if let Some(cb) = cb.take() {
cb.call::<(), ()>(())?;
};
}

call_vec.clear();
to_call.replace(call_vec);

let result = unsafe { qjs::JS_ExecutePendingJob(rt, ctx_ptr.as_mut_ptr()) };

if result < 0 {
let js_context = unsafe { ctx_ptr.assume_init() };
let ctx_ptr = NonNull::new(js_context)
.expect("executing pending job returned a null context on error");

let ctx = unsafe { Ctx::from_raw(ctx_ptr) };

let err = ctx.catch();

if let Some(x) = err.clone().into_object().and_then(Exception::from_object) {
Err(x.throw())?;
} else {
Err(ctx.throw(err))?;
}
}

if !had_items && result == 0 {
break;
}
}
timeouts5.lock().unwrap().clear();

Ok(())
})?;

globals.set("setTimeout", Func::from(set_timeout))?;
globals.set("clearTimeout", Func::from(clear_timeout))?;
globals.set("setInterval", Func::from(set_interval))?;
globals.set("clearInterval", Func::from(clear_timeout))?;
globals.set("setImmediate", Func::from(set_immediate))?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ impl Vm {

pub async fn run_and_handle_exceptions<'js, F>(ctx: &AsyncContext, f: F)
where
F: FnOnce(Ctx) -> rquickjs::Result<()>,
F: FnOnce(Ctx) -> rquickjs::Result<()> + Send,
{
ctx.with(|ctx| {
f(ctx.clone())
Expand Down Expand Up @@ -452,7 +452,7 @@ impl Vm {
}
}

fn init(ctx: &Ctx, module_names: HashSet<&'static str>) -> Result<()> {
fn init(ctx: &Ctx<'_>, module_names: HashSet<&'static str>) -> Result<()> {
let globals = ctx.globals();

globals.set("global", ctx.globals())?;
Expand Down
6 changes: 6 additions & 0 deletions tests/timers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ describe("timers", () => {
assert.ok(end - start >= 10);
});

it("should set nested timeout", (done) => {
setTimeout(() => {
setTimeout(done, 10);
}, 10);
});

it("should clear timeout", async () => {
const start = Date.now();
let status = "";
Expand Down

0 comments on commit f2d39ac

Please sign in to comment.