diff --git a/src/poller/mod.rs b/src/poller/mod.rs index 7d51d08..cd7b2e2 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -27,11 +27,12 @@ pub mod popol; use std::fmt::{self, Display, Formatter}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::time::Duration; use std::{io, ops}; use crate::resource::Io; +use crate::ResourceId; /// Information about I/O events which has happened for a resource. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] @@ -147,19 +148,19 @@ pub enum IoFail { /// To read I/O events from the engine please use its Iterator interface. pub trait Poll where - Self: Send + Iterator)>, - for<'a> &'a mut Self: Iterator)>, + Self: Send + Iterator)>, + for<'a> &'a mut Self: Iterator)>, { /// Waker type used by the poll provider. type Waker: Waker; /// Registers a file-descriptor based resource for a poll. - fn register(&mut self, fd: &impl AsRawFd, interest: IoType); + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId; /// Unregisters a file-descriptor based resource from a poll. - fn unregister(&mut self, fd: &impl AsRawFd); + fn unregister(&mut self, id: ResourceId); /// Subscribes for a specific set of events for a given file descriptor-backed resource (see /// [`IoType`] for the details on event subscription). - fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool; + fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool; /// Runs single poll syscall over all registered resources with an optional timeout. /// diff --git a/src/poller/popol.rs b/src/poller/popol.rs index c699fbc..ee28168 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -30,12 +30,14 @@ use std::sync::Arc; use std::time::Duration; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; +use crate::ResourceId; /// Manager for a set of reactor which are polled for an event loop by the /// re-actor by using [`popol`] library. pub struct Poller { - poll: popol::Sources, - events: VecDeque>, + poll: popol::Sources, + events: VecDeque>, + id_top: ResourceId, } impl Default for Poller { @@ -48,6 +50,7 @@ impl Poller { Self { poll: popol::Sources::new(), events: empty!(), + id_top: 0, } } @@ -57,6 +60,7 @@ impl Poller { Self { poll: popol::Sources::with_capacity(capacity), events: VecDeque::with_capacity(capacity), + id_top: 0, } } } @@ -64,26 +68,29 @@ impl Poller { impl Poll for Poller { type Waker = PopolWaker; - fn register(&mut self, fd: &impl AsRawFd, interest: IoType) { + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId { + let id = self.id_top; + self.id_top += 1; + #[cfg(feature = "log")] - log::trace!(target: "popol", "Registering {}", fd.as_raw_fd()); - self.poll.register(fd.as_raw_fd(), fd, interest.into()); + log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id); + + self.poll.register(id, fd, interest.into()); + id } - fn unregister(&mut self, fd: &impl AsRawFd) { + fn unregister(&mut self, id: ResourceId) { #[cfg(feature = "log")] - log::trace!(target: "popol", "Unregistering {}", fd.as_raw_fd()); - self.poll.unregister(&fd.as_raw_fd()); + log::trace!(target: "popol", "Unregistering {}", id); + self.poll.unregister(&id); } - fn set_interest(&mut self, fd: &impl AsRawFd, interest: IoType) -> bool { - let fd = fd.as_raw_fd(); - + fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool { #[cfg(feature = "log")] - log::trace!(target: "popol", "Setting interest `{interest}` on {}", fd); + log::trace!(target: "popol", "Setting interest `{interest}` on {}", id); - self.poll.unset(&fd, (!interest).into()); - self.poll.set(&fd, interest.into()) + self.poll.unset(&id, (!interest).into()); + self.poll.set(&id, interest.into()) } fn poll(&mut self, timeout: Option) -> io::Result { @@ -115,21 +122,21 @@ impl Poll for Poller { } impl Iterator for Poller { - type Item = (RawFd, Result); + type Item = (ResourceId, Result); fn next(&mut self) -> Option { let event = self.events.pop_front()?; - let fd = event.key; + let id = event.key; let fired = event.raw_events(); let res = if event.is_hangup() { #[cfg(feature = "log")] - log::trace!(target: "popol", "Hangup on {fd}"); + log::trace!(target: "popol", "Hangup on {id}"); Err(IoFail::Connectivity(fired)) } else if event.is_error() || event.is_invalid() { #[cfg(feature = "log")] - log::trace!(target: "popol", "OS error on {fd} (fired events {fired:#b})"); + log::trace!(target: "popol", "OS error on {id} (fired events {fired:#b})"); Err(IoFail::Os(fired)) } else { @@ -139,11 +146,11 @@ impl Iterator for Poller { }; #[cfg(feature = "log")] - log::trace!(target: "popol", "I/O event on {fd}: {io}"); + log::trace!(target: "popol", "I/O event on {id}: {io}"); Ok(io) }; - Some((fd, res)) + Some((id, res)) } } diff --git a/src/reactor.rs b/src/reactor.rs index e17ee3e..cd40d6b 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::thread::JoinHandle; use std::time::Duration; use std::{io, thread}; @@ -32,7 +32,7 @@ use crossbeam_channel as chan; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; use crate::resource::WriteError; -use crate::{Resource, Timer, Timestamp, WriteAtomic}; +use crate::{Resource, ResourceId, Timer, Timestamp, WriteAtomic}; /// Maximum amount of time to wait for I/O. const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); @@ -42,10 +42,10 @@ const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60); #[display(doc_comments)] pub enum Error { /// transport {0} got disconnected during poll operation. - ListenerDisconnect(L::Id, L), + ListenerDisconnect(ResourceId, L), /// transport {0} got disconnected during poll operation. - TransportDisconnect(T::Id, T), + TransportDisconnect(ResourceId, T), /// polling multiple reactor has failed. Details: {0:?} Poll(io::Error), @@ -81,7 +81,7 @@ pub enum Action { /// closed, listener is not unbound, connections are not closed etc. All these actions must be /// handled by the handler upon the handover event. #[display("unregister_listener")] - UnregisterListener(L::Id), + UnregisterListener(ResourceId), /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via /// [`Handler::handover_transport`]. @@ -90,11 +90,11 @@ pub enum Action { /// closed, listener is not unbound, connections are not closed etc. All these actions must be /// handled by the handler upon the handover event. #[display("unregister_transport")] - UnregisterTransport(T::Id), + UnregisterTransport(ResourceId), /// Write the data to one of the transport resources using [`io::Write`]. #[display("send_to({0})")] - Send(T::Id, Vec), + Send(ResourceId, Vec), /// Set a new timer for a given duration from this moment. /// @@ -143,7 +143,7 @@ pub trait Handler: Send + Iterator::Id, + id: ResourceId, event: ::Event, time: Timestamp, ); @@ -151,7 +151,7 @@ pub trait Handler: Send + Iterator::Id, + id: ResourceId, event: ::Event, time: Timestamp, ); @@ -271,7 +271,7 @@ impl Reactor { let thread = builder.spawn(move || { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - poller.register(&waker_reader, IoType::read_only()); + let waker_id = poller.register(&waker_reader, IoType::read_only()); let runtime = Runtime { service, @@ -280,9 +280,8 @@ impl Reactor { ctl_recv, listeners: empty!(), transports: empty!(), - listener_map: empty!(), - transport_map: empty!(), waker: waker_reader, + waker_id, timeouts: Timer::new(), }; @@ -390,11 +389,10 @@ pub struct Runtime { poller: P, controller: Controller::Send>, ctl_recv: chan::Receiver>, - listener_map: HashMap::Id>, - transport_map: HashMap::Id>, - listeners: HashMap<::Id, H::Listener>, - transports: HashMap<::Id, H::Transport>, + listeners: HashMap, + transports: HashMap, waker: ::Recv, + waker_id: ResourceId, timeouts: Timer, } @@ -422,9 +420,8 @@ impl Runtime { ctl_recv, listeners: empty!(), transports: empty!(), - listener_map: empty!(), - transport_map: empty!(), waker: waker_reader, + waker_id, timeouts: Timer::new(), }) } @@ -442,11 +439,11 @@ impl Runtime { let before_poll = Timestamp::now(); let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT); - for res in self.listeners.values() { - self.poller.set_interest(res, res.interests()); + for (id, res) in &self.listeners { + self.poller.set_interest(*id, res.interests()); } - for res in self.transports.values() { - self.poller.set_interest(res, res.interests()); + for (id, res) in &self.transports { + self.poller.set_interest(*id, res.interests()); } // Blocking @@ -506,8 +503,8 @@ impl Runtime { let mut awoken = false; let mut unregister_queue = vec![]; - while let Some((fd, res)) = self.poller.next() { - if fd == self.waker.as_raw_fd() { + while let Some((id, res)) = self.poller.next() { + if id == self.waker_id { if let Err(err) = res { #[cfg(feature = "log")] log::error!(target: "reactor", "Polling waker has failed: {err}"); @@ -519,16 +516,16 @@ impl Runtime { self.waker.reset(); awoken = true; - } else if let Some(id) = self.listener_map.get(&fd) { + } else if self.listeners.contains_key(&id) { match res { Ok(io) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from listener {id} (fd={fd})"); + log::trace!(target: "reactor", "Got `{io}` event from listener {id}"); - let listener = self.listeners.get_mut(id).expect("resource disappeared"); + let listener = self.listeners.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = listener.handle_io(io) { - self.service.handle_listener_event(*id, event, time); + self.service.handle_listener_event(id, event, time); } } } @@ -536,26 +533,26 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} hung up (OS flags {flags:#b})"); - let listener = self.listeners.remove(id).expect("resource disappeared"); - unregister_queue.push(listener.as_raw_fd()); - self.service.handle_error(Error::ListenerDisconnect(*id, listener)); + let listener = self.listeners.remove(&id).expect("resource disappeared"); + unregister_queue.push(id); + self.service.handle_error(Error::ListenerDisconnect(id, listener)); } Err(IoFail::Os(flags)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})"); - self.unregister_listener(*id); + self.unregister_listener(id); } } - } else if let Some(id) = self.transport_map.get(&fd) { + } else if self.transports.contains_key(&id) { match res { Ok(io) => { #[cfg(feature = "log")] - log::trace!(target: "reactor", "Got `{io}` event from transport {id} (fd={fd})"); + log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); - let transport = self.transports.get_mut(id).expect("resource disappeared"); + let transport = self.transports.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = transport.handle_io(io) { - self.service.handle_transport_event(*id, event, time); + self.service.handle_transport_event(id, event, time); } } } @@ -563,14 +560,14 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})"); - let transport = self.transports.remove(id).expect("resource disappeared"); - unregister_queue.push(transport.as_raw_fd()); - self.service.handle_error(Error::TransportDisconnect(*id, transport)); + let transport = self.transports.remove(&id).expect("resource disappeared"); + unregister_queue.push(id); + self.service.handle_error(Error::TransportDisconnect(id, transport)); } Err(IoFail::Os(posix_events)) => { #[cfg(feature = "log")] log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})"); - self.unregister_transport(*id); + self.unregister_transport(id); } } } else { @@ -581,8 +578,8 @@ impl Runtime { } // We need this b/c of borrow checker - for fd in unregister_queue { - self.poller.unregister(&fd); + for id in unregister_queue { + self.poller.unregister(id); } awoken @@ -615,30 +612,26 @@ impl Runtime { ) -> Result<(), Error> { match action { Action::RegisterListener(listener) => { - let id = listener.id(); let fd = listener.as_raw_fd(); #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering listener on {id} (fd={fd})"); + log::debug!(target: "reactor", "Registering listener with fd={fd}"); - self.poller.register(&listener, IoType::read_only()); + let id = self.poller.register(&listener, IoType::read_only()); self.listeners.insert(id, listener); - self.listener_map.insert(fd, id); } Action::RegisterTransport(transport) => { - let id = transport.id(); let fd = transport.as_raw_fd(); #[cfg(feature = "log")] - log::debug!(target: "reactor", "Registering transport on {id} (fd={fd})"); + log::debug!(target: "reactor", "Registering transport with fd={fd}"); - self.poller.register(&transport, IoType::read_only()); + let id = self.poller.register(&transport, IoType::read_only()); self.transports.insert(id, transport); - self.transport_map.insert(fd, id); } Action::UnregisterListener(id) => { let Some(listener) = self.unregister_listener(id) else { - return Ok(()) + return Ok(()); }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id}"); @@ -646,7 +639,7 @@ impl Runtime { } Action::UnregisterTransport(id) => { let Some(transport) = self.unregister_transport(id) else { - return Ok(()) + return Ok(()); }; #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over transport {id}"); @@ -660,7 +653,7 @@ impl Runtime { #[cfg(feature = "log")] log::error!(target: "reactor", "Transport {id} is not in the reactor"); - return Ok(()) + return Ok(()); }; match transport.write_atomic(&data) { Err(WriteError::NotReady) => { @@ -699,11 +692,11 @@ impl Runtime { // We just drop here? } - fn unregister_listener(&mut self, id: ::Id) -> Option { + fn unregister_listener(&mut self, id: ResourceId) -> Option { let Some(listener) = self.listeners.remove(&id) else { #[cfg(feature = "log")] log::warn!(target: "reactor", "Unregistering non-registered listener {id}"); - return None + return None; }; let fd = listener.as_raw_fd(); @@ -711,19 +704,16 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})"); - self.listener_map - .remove(&fd) - .expect("listener index content doesn't match registered listeners"); - self.poller.unregister(&listener); + self.poller.unregister(id); Some(listener) } - fn unregister_transport(&mut self, id: ::Id) -> Option { + fn unregister_transport(&mut self, id: ResourceId) -> Option { let Some(transport) = self.transports.remove(&id) else { #[cfg(feature = "log")] log::warn!(target: "reactor", "Unregistering non-registered transport {id}"); - return None + return None; }; let fd = transport.as_raw_fd(); @@ -731,10 +721,7 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})"); - self.transport_map - .remove(&fd) - .expect("transport index content doesn't match registered transports"); - self.poller.unregister(&transport); + self.poller.unregister(id); Some(transport) } @@ -743,6 +730,7 @@ impl Runtime { #[cfg(test)] mod test { use std::io::stdout; + use std::os::fd::RawFd; use std::thread::sleep; use super::*; @@ -765,9 +753,7 @@ mod test { fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) } } impl Resource for DumbRes { - type Id = RawFd; type Event = (); - fn id(&self) -> Self::Id { self.0.as_raw_fd() } fn interests(&self) -> IoType { IoType::read_write() } fn handle_io(&mut self, _io: Io) -> Option { None } } @@ -815,7 +801,7 @@ mod test { } fn handle_listener_event( &mut self, - _d: ::Id, + _d: ResourceId, _event: ::Event, _time: Timestamp, ) { @@ -823,7 +809,7 @@ mod test { } fn handle_transport_event( &mut self, - _id: ::Id, + _id: ResourceId, _event: ::Event, _time: Timestamp, ) { diff --git a/src/resource.rs b/src/resource.rs index 8d56c7f..476f368 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -21,12 +21,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::hash::Hash; -use std::io::ErrorKind; +use std::io::{self, ErrorKind}; use std::os::unix::io::AsRawFd; -use std::os::unix::prelude::RawFd; -use std::{io, net}; use crate::poller::IoType; @@ -40,19 +38,16 @@ pub enum Io { Write, } -/// Marker traits for types which can be used as a reactor-managed [`Resource`] identifiers. -pub trait ResourceId: Copy + Eq + Ord + Hash + Send + Debug + Display {} +/// The resource identifier must be globally unique and non-reusable object. Because of this, +/// things like [`RawFd`] and socket addresses can't operate like resource identifiers. +pub type ResourceId = u64; /// A resource which can be managed by the reactor. pub trait Resource: AsRawFd + WriteAtomic + Send { - /// Resource identifier type. - type Id: ResourceId; /// Events which resource may generate upon receiving I/O from the reactor via /// [`Self::handle_io`]. These events are passed to the reactor [`crate::Handler`]. type Event; - /// Method returning the [`ResourceId`]. - fn id(&self) -> Self::Id; /// Method informing the reactor which types of events this resource is subscribed for. fn interests(&self) -> IoType; @@ -61,9 +56,6 @@ pub trait Resource: AsRawFd + WriteAtomic + Send { fn handle_io(&mut self, io: Io) -> Option; } -impl ResourceId for net::SocketAddr {} -impl ResourceId for RawFd {} - /// Error during write operation for a reactor-managed [`Resource`]. #[derive(Debug, Display, Error, From)] pub enum WriteError {