Skip to content

Commit

Permalink
refactor all resources ids to be sequential numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed Dec 18, 2023
1 parent 0d30845 commit c401591
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 108 deletions.
13 changes: 7 additions & 6 deletions src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
pub mod popol;

use std::fmt::{self, Display, Formatter};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::io::AsRawFd;
use std::time::Duration;
use std::{io, ops};

use crate::resource::Io;
use crate::ResourceId;

/// Information about I/O events which has happened for a resource.
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
Expand Down Expand Up @@ -147,19 +148,19 @@ pub enum IoFail {
/// To read I/O events from the engine please use its Iterator interface.
pub trait Poll
where
Self: Send + Iterator<Item = (RawFd, Result<IoType, IoFail>)>,
for<'a> &'a mut Self: Iterator<Item = (RawFd, Result<IoType, IoFail>)>,
Self: Send + Iterator<Item = (ResourceId, Result<IoType, IoFail>)>,
for<'a> &'a mut Self: Iterator<Item = (ResourceId, Result<IoType, IoFail>)>,
{
/// Waker type used by the poll provider.
type Waker: Waker;

/// Registers a file-descriptor based resource for a poll.
fn register(&mut self, fd: &impl AsRawFd, interest: IoType);
fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId;
/// Unregisters a file-descriptor based resource from a poll.
fn unregister(&mut self, fd: &impl AsRawFd);
fn unregister(&mut self, id: ResourceId);
/// Subscribes for a specific set of events for a given file descriptor-backed resource (see
/// [`IoType`] for the details on event subscription).
fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool;
fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool;

/// Runs single poll syscall over all registered resources with an optional timeout.
///
Expand Down
47 changes: 27 additions & 20 deletions src/poller/popol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use std::sync::Arc;
use std::time::Duration;

use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::ResourceId;

/// Manager for a set of reactor which are polled for an event loop by the
/// re-actor by using [`popol`] library.
pub struct Poller {
poll: popol::Sources<RawFd>,
events: VecDeque<popol::Event<RawFd>>,
poll: popol::Sources<ResourceId>,
events: VecDeque<popol::Event<ResourceId>>,
id_top: ResourceId,
}

impl Default for Poller {
Expand All @@ -48,6 +50,7 @@ impl Poller {
Self {
poll: popol::Sources::new(),
events: empty!(),
id_top: 0,
}
}

Expand All @@ -57,33 +60,37 @@ impl Poller {
Self {
poll: popol::Sources::with_capacity(capacity),
events: VecDeque::with_capacity(capacity),
id_top: 0,
}
}
}

impl Poll for Poller {
type Waker = PopolWaker;

fn register(&mut self, fd: &impl AsRawFd, interest: IoType) {
fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId {
let id = self.id_top;
self.id_top += 1;

#[cfg(feature = "log")]
log::trace!(target: "popol", "Registering {}", fd.as_raw_fd());
self.poll.register(fd.as_raw_fd(), fd, interest.into());
log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id);

self.poll.register(id, fd, interest.into());
id
}

fn unregister(&mut self, fd: &impl AsRawFd) {
fn unregister(&mut self, id: ResourceId) {
#[cfg(feature = "log")]
log::trace!(target: "popol", "Unregistering {}", fd.as_raw_fd());
self.poll.unregister(&fd.as_raw_fd());
log::trace!(target: "popol", "Unregistering {}", id);
self.poll.unregister(&id);
}

fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool {
let fd = fd.as_raw_fd();

fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool {
#[cfg(feature = "log")]
log::trace!(target: "popol", "Setting interest `{interest}` on {}", fd);
log::trace!(target: "popol", "Setting interest `{interest}` on {}", id);

self.poll.unset(&fd, (!interest).into());
self.poll.set(&fd, interest.into())
self.poll.unset(&id, (!interest).into());
self.poll.set(&id, interest.into())
}

fn poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
Expand Down Expand Up @@ -115,21 +122,21 @@ impl Poll for Poller {
}

impl Iterator for Poller {
type Item = (RawFd, Result<IoType, IoFail>);
type Item = (ResourceId, Result<IoType, IoFail>);

fn next(&mut self) -> Option<Self::Item> {
let event = self.events.pop_front()?;

let fd = event.key;
let id = event.key;
let fired = event.raw_events();
let res = if event.is_hangup() {
#[cfg(feature = "log")]
log::trace!(target: "popol", "Hangup on {fd}");
log::trace!(target: "popol", "Hangup on {id}");

Err(IoFail::Connectivity(fired))
} else if event.is_error() || event.is_invalid() {
#[cfg(feature = "log")]
log::trace!(target: "popol", "OS error on {fd} (fired events {fired:#b})");
log::trace!(target: "popol", "OS error on {id} (fired events {fired:#b})");

Err(IoFail::Os(fired))
} else {
Expand All @@ -139,11 +146,11 @@ impl Iterator for Poller {
};

#[cfg(feature = "log")]
log::trace!(target: "popol", "I/O event on {fd}: {io}");
log::trace!(target: "popol", "I/O event on {id}: {io}");

Ok(io)
};
Some((fd, res))
Some((id, res))
}
}

Expand Down
Loading

0 comments on commit c401591

Please sign in to comment.