Skip to content

Commit

Permalink
Use closed and close_queue methods on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
scanterog committed Nov 6, 2023
1 parent 5686185 commit 58f7859
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,12 @@ where
fn drop(&mut self) {
trace!("Destroying consumer: {:?}", self.client.native_ptr());
if self.group_id.is_some() {
let err = unsafe {
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
};
if !err.is_null() {
error!("Failed to close the consumer queue on drop");
}

while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
self.poll(Duration::from_millis(100));
if let Err(err) = self.close_queue() {
error!("Failed to close consumer queue on drop: {}", err);
} else {
while !self.closed() {
self.poll(Duration::from_millis(100));
}
}
}
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
Expand Down

0 comments on commit 58f7859

Please sign in to comment.