Skip to content

Commit

Permalink
Propagate fatal errors
Browse files Browse the repository at this point in the history
With the Event API we propagate generic client instance-level errors,
such as broker connection failures, authentication issues, etc.

However, fatal errors are also propagated via the Event API. These
indicates that the particular instance of the client (producer/consumer)
becomes non-functional.
  • Loading branch information
scanterog committed Nov 6, 2023
1 parent 58f7859 commit 113af85
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
20 changes: 10 additions & 10 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,16 @@ where
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
if rdkafka_err.is_error() {
let err = match rdkafka_err {
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
KafkaError::PartitionEOF(partition)
}
e => KafkaError::MessageConsumption(e.into()),
};
Some(err)
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
Some(KafkaError::PartitionEOF(partition))
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
} else {
Some(KafkaError::MessageConsumption(rdkafka_err.into()))
}
} else {
None
}
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub enum KafkaError {
GroupListFetch(RDKafkaErrorCode),
/// Message consumption failed.
MessageConsumption(RDKafkaErrorCode),
/// Message consumption failed with fatal error.
MessageConsumptionFatal(RDKafkaErrorCode),
/// Message production error.
MessageProduction(RDKafkaErrorCode),
/// Metadata fetch error.
Expand Down Expand Up @@ -217,6 +219,9 @@ impl fmt::Debug for KafkaError {
KafkaError::MessageConsumption(err) => {
write!(f, "KafkaError (Message consumption error: {})", err)
}
KafkaError::MessageConsumptionFatal(err) => {
write!(f, "(Fatal) KafkaError (Message consumption error: {})", err)
}
KafkaError::MessageProduction(err) => {
write!(f, "KafkaError (Message production error: {})", err)
}
Expand Down Expand Up @@ -265,6 +270,9 @@ impl fmt::Display for KafkaError {
KafkaError::Global(err) => write!(f, "Global error: {}", err),
KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
KafkaError::MessageConsumptionFatal(err) => {
write!(f, "(Fatal) Message consumption error: {}", err)
}
KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
KafkaError::NoMessageReceived => {
Expand Down Expand Up @@ -299,6 +307,7 @@ impl Error for KafkaError {
KafkaError::Global(err) => Some(err),
KafkaError::GroupListFetch(err) => Some(err),
KafkaError::MessageConsumption(err) => Some(err),
KafkaError::MessageConsumptionFatal(err) => Some(err),
KafkaError::MessageProduction(err) => Some(err),
KafkaError::MetadataFetch(err) => Some(err),
KafkaError::NoMessageReceived => None,
Expand Down Expand Up @@ -339,6 +348,7 @@ impl KafkaError {
KafkaError::Global(err) => Some(*err),
KafkaError::GroupListFetch(err) => Some(*err),
KafkaError::MessageConsumption(err) => Some(*err),
KafkaError::MessageConsumptionFatal(err) => Some(*err),
KafkaError::MessageProduction(err) => Some(*err),
KafkaError::MetadataFetch(err) => Some(*err),
KafkaError::NoMessageReceived => None,
Expand Down

0 comments on commit 113af85

Please sign in to comment.