Skip to content

Commit

Permalink
Introduce Deadline type to simplify BaseProducer::flush and
Browse files Browse the repository at this point in the history
`BaseProducer::poll`.
  • Loading branch information
davidblewett committed Dec 10, 2024
1 parent 204dbe2 commit 36d4ebf
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
35 changes: 16 additions & 19 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use std::time::Duration;

use rdkafka_sys as rdsys;
use rdkafka_sys::rd_kafka_vtype_t::*;
Expand All @@ -67,7 +67,7 @@ use crate::producer::{
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
};
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{IntoOpaque, NativePtr, Timeout};
use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout};

pub use crate::message::DeliveryResult;

Expand Down Expand Up @@ -360,17 +360,16 @@ where
/// Regular calls to `poll` are required to process the events and execute
/// the message delivery callbacks.
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
let mut remaining = if let Timeout::After(dur) = timeout.into() {
dur
let deadline = if let Timeout::After(dur) = timeout.into() {
Deadline::new(dur)
} else {
Duration::MAX
Deadline::new(Duration::MAX)
};
let mut attempt = 0;
while attempt >= 0 && remaining > Duration::ZERO {
let start = Instant::now();
while attempt >= 0 && !deadline.elapsed() {
let event = self
.client()
.poll_event(&self.queue, Timeout::After(remaining));
.poll_event(&self.queue, Timeout::After(deadline.remaining()));
if let EventPollResult::Event(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
Expand All @@ -384,7 +383,6 @@ where
}
}
}
remaining = remaining.saturating_sub(start.elapsed());
attempt += 1;
}
}
Expand Down Expand Up @@ -505,24 +503,23 @@ where
// As this library uses the rdkafka Event API, flush will not call rd_kafka_poll() but instead wait for
// the librdkafka-handled message count to reach zero. Runs until value reaches zero or timeout.
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
let mut remaining = if let Timeout::After(dur) = timeout.into() {
dur
let deadline = if let Timeout::After(dur) = timeout.into() {
Deadline::new(dur)
} else {
Duration::MAX
Deadline::new(Duration::MAX)
};
while self.in_flight_count() > 0 && remaining > Duration::ZERO {
let flush_start = Instant::now();
while self.in_flight_count() > 0 && !deadline.elapsed() {
let ret = unsafe {
// This cast to i32 will truncate to i32::MAX
rdsys::rd_kafka_flush(self.client().native_ptr(), remaining.as_millis() as i32)
rdsys::rd_kafka_flush(
self.client().native_ptr(),
deadline.remaining().as_millis() as i32,
)
};
if ret.is_error() {
return Err(KafkaError::Flush(ret.into()));
} else {
remaining = remaining.saturating_sub(flush_start.elapsed());
let poll_start = Instant::now();
self.poll(remaining);
remaining = remaining.saturating_sub(poll_start.elapsed());
self.poll(&deadline);
}
}
Ok(())
Expand Down
34 changes: 33 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::slice;
use std::sync::Arc;
#[cfg(feature = "naive-runtime")]
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

#[cfg(feature = "naive-runtime")]
use futures_channel::oneshot;
Expand All @@ -31,6 +31,26 @@ pub fn get_rdkafka_version() -> (i32, String) {
(version_number, c_str.to_string_lossy().into_owned())
}

pub(crate) struct Deadline {
deadline: Instant,
}

impl Deadline {
pub(crate) fn new(duration: Duration) -> Self {
Self {
deadline: Instant::now() + duration,
}
}

pub(crate) fn remaining(&self) -> Duration {
self.deadline - Instant::now()
}

pub(crate) fn elapsed(&self) -> bool {
self.remaining() <= Duration::ZERO
}
}

/// Specifies a timeout for a Kafka operation.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum Timeout {
Expand Down Expand Up @@ -76,6 +96,18 @@ impl std::ops::SubAssign for Timeout {
}
}

impl From<Deadline> for Timeout {
fn from(d: Deadline) -> Timeout {
Timeout::After(d.remaining())
}
}

impl From<&Deadline> for Timeout {
fn from(d: &Deadline) -> Timeout {
Timeout::After(d.remaining())
}
}

impl From<Duration> for Timeout {
fn from(d: Duration) -> Timeout {
Timeout::After(d)
Expand Down

0 comments on commit 36d4ebf

Please sign in to comment.