Skip to content

Commit

Permalink
Allow Deadline to express Timeout::Never losslessly.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidblewett committed Dec 11, 2024
1 parent 1957d2f commit c4e1552
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Utility functions and types.
use std::cmp;
use std::ffi::CStr;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -31,19 +32,29 @@ pub fn get_rdkafka_version() -> (i32, String) {
(version_number, c_str.to_string_lossy().into_owned())
}

pub(crate) struct Deadline {
deadline: Instant,
pub(crate) enum Deadline {
At(Instant),
Never,
}

impl Deadline {
pub(crate) fn new(duration: Duration) -> Self {
Self {
deadline: Instant::now() + duration,
// librdkafka's flush api requires an i32 millisecond timeout
const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);

pub(crate) fn new(duration: Option<Duration>) -> Self {
if let Some(d) = duration {
Self::At(Instant::now() + d)
} else {
Self::Never
}
}

pub(crate) fn remaining(&self) -> Duration {
self.deadline - Instant::now()
if let Deadline::At(i) = self {
*i - Instant::now()
} else {
Duration::MAX
}
}

pub(crate) fn elapsed(&self) -> bool {
Expand Down Expand Up @@ -99,22 +110,30 @@ impl std::ops::SubAssign for Timeout {
impl From<Timeout> for Deadline {
fn from(t: Timeout) -> Deadline {
if let Timeout::After(dur) = t {
Deadline::new(dur)
Deadline::new(Some(cmp::min(Deadline::MAX_FLUSH_DURATION, dur)))
} else {
Deadline::new(Duration::MAX)
Deadline::new(None)
}
}
}

impl From<Deadline> for Timeout {
fn from(d: Deadline) -> Timeout {
Timeout::After(d.remaining())
if let Deadline::Never = d {
Timeout::Never
} else {
Timeout::After(d.remaining())
}
}
}

impl From<&Deadline> for Timeout {
fn from(d: &Deadline) -> Timeout {
Timeout::After(d.remaining())
if let Deadline::Never = d {
Timeout::Never
} else {
Timeout::After(d.remaining())
}
}
}

Expand Down

0 comments on commit c4e1552

Please sign in to comment.