diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index a6d1e39a1..5fcc42bc5 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -225,16 +225,16 @@ where fn handle_error_event(&self, event: NativePtr) -> Option { let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; if rdkafka_err.is_error() { - let err = match rdkafka_err { - rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { - let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; - let partition = unsafe { (*tp_ptr).partition }; - unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; - KafkaError::PartitionEOF(partition) - } - e => KafkaError::MessageConsumption(e.into()), - }; - Some(err) + if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF { + let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; + let partition = unsafe { (*tp_ptr).partition }; + unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; + Some(KafkaError::PartitionEOF(partition)) + } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 { + Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into())) + } else { + Some(KafkaError::MessageConsumption(rdkafka_err.into())) + } } else { None } diff --git a/src/error.rs b/src/error.rs index 46a40be54..312a6bb65 100644 --- a/src/error.rs +++ b/src/error.rs @@ -157,6 +157,8 @@ pub enum KafkaError { GroupListFetch(RDKafkaErrorCode), /// Message consumption failed. MessageConsumption(RDKafkaErrorCode), + /// Message consumption failed with fatal error. + MessageConsumptionFatal(RDKafkaErrorCode), /// Message production error. MessageProduction(RDKafkaErrorCode), /// Metadata fetch error. @@ -217,6 +219,9 @@ impl fmt::Debug for KafkaError { KafkaError::MessageConsumption(err) => { write!(f, "KafkaError (Message consumption error: {})", err) } + KafkaError::MessageConsumptionFatal(err) => { + write!(f, "(Fatal) KafkaError (Message consumption error: {})", err) + } KafkaError::MessageProduction(err) => { write!(f, "KafkaError (Message production error: {})", err) } @@ -265,6 +270,9 @@ impl fmt::Display for KafkaError { KafkaError::Global(err) => write!(f, "Global error: {}", err), KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err), KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err), + KafkaError::MessageConsumptionFatal(err) => { + write!(f, "(Fatal) Message consumption error: {}", err) + } KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), KafkaError::NoMessageReceived => { @@ -299,6 +307,7 @@ impl Error for KafkaError { KafkaError::Global(err) => Some(err), KafkaError::GroupListFetch(err) => Some(err), KafkaError::MessageConsumption(err) => Some(err), + KafkaError::MessageConsumptionFatal(err) => Some(err), KafkaError::MessageProduction(err) => Some(err), KafkaError::MetadataFetch(err) => Some(err), KafkaError::NoMessageReceived => None, @@ -339,6 +348,7 @@ impl KafkaError { KafkaError::Global(err) => Some(*err), KafkaError::GroupListFetch(err) => Some(*err), KafkaError::MessageConsumption(err) => Some(*err), + KafkaError::MessageConsumptionFatal(err) => Some(*err), KafkaError::MessageProduction(err) => Some(*err), KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None,