Skip to content

Commit

Permalink
backend/epoll: implement eventfd wakeup notification
Browse files Browse the repository at this point in the history
Tries to mimic what happens in backend/kqueue.

Closes mitchellh#4
  • Loading branch information
steeve committed Nov 19, 2024
1 parent 7855c4f commit 36382a4
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/backend/epoll.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub const Loop = struct {

fd: posix.fd_t,

/// The eventfd that this epoll queue always has a filter for. Writing
/// an empty message to this eventfd can be used to wake up the loop
/// at any time. Waking up the loop via this eventfd won't trigger any
/// particular completion, it just forces tick to cycle.
eventfd: xev.Async,

/// The number of active completions. This DOES NOT include completions that
/// are queued in the submissions queue.
active: usize = 0,
Expand Down Expand Up @@ -56,8 +62,12 @@ pub const Loop = struct {
} = .{},

pub fn init(options: xev.Options) !Loop {
var eventfd = try xev.Async.init();
errdefer eventfd.deinit();

var res: Loop = .{
.fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC),
.eventfd = eventfd,
.thread_pool = options.thread_pool,
.thread_pool_completions = undefined,
.cached_now = undefined,
Expand All @@ -68,6 +78,7 @@ pub const Loop = struct {

pub fn deinit(self: *Loop) void {
posix.close(self.fd);
self.eventfd.deinit();
}

/// Run the event loop. See RunMode documentation for details on modes.
Expand Down Expand Up @@ -262,9 +273,26 @@ pub const Loop = struct {
// Initialize
if (!self.flags.init) {
self.flags.init = true;

if (self.thread_pool != null) {
self.thread_pool_completions.init();
}

var ev: linux.epoll_event = .{
.events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
.data = .{ .ptr = 0 },
};
posix.epoll_ctl(
self.fd,
linux.EPOLL.CTL_ADD,
self.eventfd.fd,
&ev,
) catch |err| {
// We reset initialization because we can't do anything
// safely unless we get this mach port registered!
self.flags.init = false;
return err;
};
}

// Submit all the submissions. We copy the submission queue so that
Expand Down Expand Up @@ -369,6 +397,10 @@ pub const Loop = struct {

// Process all our events and invoke their completion handlers
for (events[0..n]) |ev| {
// Zero data values are internal events that we do nothing
// on such as the eventfd wakeup.
if (ev.data.ptr == 0) continue;

const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));

// We get the fd and mark this as in progress we can properly
Expand Down Expand Up @@ -415,6 +447,7 @@ pub const Loop = struct {
const pool = self.thread_pool orelse return error.ThreadPoolRequired;

// Setup our completion state so that thread_perform can do stuff
c.task_loop = self;
c.task_completions = &self.thread_pool_completions;
c.task = .{ .callback = Loop.thread_perform };

Expand All @@ -436,6 +469,14 @@ pub const Loop = struct {

// Add to our completion queue
c.task_completions.push(c);

// Wake up our main loop
c.task_loop.wakeup() catch {};
}

/// Sends an empty message to this loop's eventfd so that it wakes up.
fn wakeup(self: *Loop) !void {
try self.eventfd.notify();
}

fn start(self: *Loop, completion: *Completion) void {
Expand Down Expand Up @@ -800,6 +841,7 @@ pub const Completion = struct {
/// reliable way to get access to the loop and shouldn't be used
/// except internally.
task: ThreadPool.Task = undefined,
task_loop: *Loop = undefined,
task_completions: *Loop.TaskCompletionQueue = undefined,
task_result: Result = undefined,

Expand Down

0 comments on commit 36382a4

Please sign in to comment.