From 0d1c2f8258072148459d3114b9ccaf43c02e0958 Mon Sep 17 00:00:00 2001 From: Steeve Morin Date: Tue, 19 Nov 2024 16:14:14 +0100 Subject: [PATCH] backend/epoll: implement eventfd wakeup notification Tries to mimic what happens in backend/kqueue. Closes #4 --- src/backend/epoll.zig | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig index ae4ec7d..f44d326 100644 --- a/src/backend/epoll.zig +++ b/src/backend/epoll.zig @@ -21,6 +21,12 @@ pub const Loop = struct { fd: posix.fd_t, + /// The eventfd that this epoll queue always has a filter for. Writing + /// an empty message to this eventfd can be used to wake up the loop + /// at any time. Waking up the loop via this eventfd won't trigger any + /// particular completion, it just forces tick to cycle. + eventfd: xev.Async, + /// The number of active completions. This DOES NOT include completions that /// are queued in the submissions queue. active: usize = 0, @@ -56,8 +62,12 @@ pub const Loop = struct { } = .{}, pub fn init(options: xev.Options) !Loop { + var eventfd = try xev.Async.init(); + errdefer eventfd.deinit(); + var res: Loop = .{ .fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC), + .eventfd = eventfd, .thread_pool = options.thread_pool, .thread_pool_completions = undefined, .cached_now = undefined, @@ -68,6 +78,7 @@ pub const Loop = struct { pub fn deinit(self: *Loop) void { posix.close(self.fd); + self.eventfd.deinit(); } /// Run the event loop. See RunMode documentation for details on modes. @@ -262,9 +273,26 @@ pub const Loop = struct { // Initialize if (!self.flags.init) { self.flags.init = true; + if (self.thread_pool != null) { self.thread_pool_completions.init(); } + + var ev: linux.epoll_event = .{ + .events = linux.EPOLL.IN | linux.EPOLL.RDHUP, + .data = .{ .ptr = 0 }, + }; + posix.epoll_ctl( + self.fd, + linux.EPOLL.CTL_ADD, + self.eventfd.fd, + &ev, + ) catch |err| { + // We reset initialization because we can't do anything + // safely unless we get this mach port registered! + self.flags.init = false; + return err; + }; } // Submit all the submissions. We copy the submission queue so that @@ -369,6 +397,10 @@ pub const Loop = struct { // Process all our events and invoke their completion handlers for (events[0..n]) |ev| { + // Zero data values are internal events that we do nothing + // on such as the eventfd wakeup. + if (ev.data.ptr == 0) continue; + const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr))); // We get the fd and mark this as in progress we can properly @@ -415,6 +447,7 @@ pub const Loop = struct { const pool = self.thread_pool orelse return error.ThreadPoolRequired; // Setup our completion state so that thread_perform can do stuff + c.task_loop = self; c.task_completions = &self.thread_pool_completions; c.task = .{ .callback = Loop.thread_perform }; @@ -436,6 +469,14 @@ pub const Loop = struct { // Add to our completion queue c.task_completions.push(c); + + // Wake up our main loop + c.task_loop.wakeup() catch {}; + } + + /// Sends an empty message to this loop's eventfd so that it wakes up. + fn wakeup(self: *Loop) !void { + try self.eventfd.notify(); } fn start(self: *Loop, completion: *Completion) void { @@ -800,6 +841,7 @@ pub const Completion = struct { /// reliable way to get access to the loop and shouldn't be used /// except internally. task: ThreadPool.Task = undefined, + task_loop: *Loop = undefined, task_completions: *Loop.TaskCompletionQueue = undefined, task_result: Result = undefined,