From 58f7859b03225617023fa165b74bea9ba8d2969e Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 6 Nov 2023 10:12:12 -0300 Subject: [PATCH] Use closed and close_queue methods on drop --- src/consumer/base_consumer.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index b2518b73f..a6d1e39a1 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -716,15 +716,12 @@ where fn drop(&mut self) { trace!("Destroying consumer: {:?}", self.client.native_ptr()); if self.group_id.is_some() { - let err = unsafe { - rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) - }; - if !err.is_null() { - error!("Failed to close the consumer queue on drop"); - } - - while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { - self.poll(Duration::from_millis(100)); + if let Err(err) = self.close_queue() { + error!("Failed to close consumer queue on drop: {}", err); + } else { + while !self.closed() { + self.poll(Duration::from_millis(100)); + } } } trace!("Consumer destroyed: {:?}", self.client.native_ptr());