Skip to content

Commit

Permalink
Kafka perf improvements (#1964)
Browse files Browse the repository at this point in the history
* Use topic partition queue splitting feature from rdkafka. This allows to split the main consumer queue into subqueues, such that we can spawn a subtask for each topic-partition tuple.

This roughly gives us 8-10x improvement in throughput (on my machine).

* Bump to rust-rdkafka 0.35, the latest release that seems to be ok. See fede1024/rust-rdkafka#638 (comment) for more details.

No visible performance differences.

* Add restate.kafka_ingress.requests.total metric. This is useful to graph the consumption rate.

* Feedback

* Plumb task center from constructor

* Make sure we cancel child tasks when we're shutting down
  • Loading branch information
slinkydeveloper authored Sep 18, 2024
1 parent edd4657 commit c17f82b
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 25 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/ingress-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
derive_builder = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
rdkafka = { version = "0.34", features = ["libz-static", "cmake-build"] }
rdkafka = { version = "0.35", features = ["libz-static", "cmake-build"] }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
thiserror = { workspace = true }
Expand Down
128 changes: 107 additions & 21 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use base64::Engine;
use bytes::Bytes;
use metrics::counter;
use opentelemetry::trace::TraceContextExt;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
Expand All @@ -21,14 +26,16 @@ use tokio::sync::oneshot;
use tracing::{debug, info, info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use restate_core::{cancellation_watcher, TaskCenter, TaskId, TaskKind};
use restate_ingress_dispatcher::{
DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest,
};
use restate_types::identifiers::SubscriptionId;
use restate_types::invocation::{Header, SpanRelation};
use restate_types::message::MessageIndex;
use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription};

use crate::metric_definitions::KAFKA_INGRESS_REQUESTS;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Expand All @@ -45,6 +52,8 @@ pub enum Error {
},
#[error("ingress dispatcher channel is closed")]
IngressDispatcherClosed,
#[error("topic {0} partition {1} queue split didn't succeed")]
TopicPartitionSplit(String, i32),
}

type MessageConsumer = StreamConsumer<DefaultConsumerContext>;
Expand Down Expand Up @@ -82,21 +91,25 @@ impl DeduplicationId for KafkaDeduplicationId {
pub struct MessageSender {
subscription: Subscription,
dispatcher: IngressDispatcher,

subscription_id: String,
ingress_request_counter: metrics::Counter,
}

impl MessageSender {
pub fn new(subscription: Subscription, dispatcher: IngressDispatcher) -> Self {
Self {
subscription_id: subscription.id().to_string(),
ingress_request_counter: counter!(
KAFKA_INGRESS_REQUESTS,
"subscription" => subscription.id().to_string()
),
subscription,
dispatcher,
}
}

async fn send(
&mut self,
consumer_group_id: &str,
msg: &BorrowedMessage<'_>,
) -> Result<(), Error> {
async fn send(&self, consumer_group_id: &str, msg: BorrowedMessage<'_>) -> Result<(), Error> {
// Prepare ingress span
let ingress_span = info_span!(
"kafka_ingress_consume",
Expand All @@ -119,14 +132,14 @@ impl MessageSender {
} else {
Bytes::default()
};
let headers = Self::generate_events_attributes(msg, self.subscription.id());
let headers = Self::generate_events_attributes(&msg, &self.subscription_id);

let req = IngressDispatcherRequest::event(
&self.subscription,
key,
payload,
SpanRelation::Parent(ingress_span_context),
Some(Self::generate_deduplication_id(consumer_group_id, msg)),
Some(Self::generate_deduplication_id(consumer_group_id, &msg)),
headers,
)
.map_err(|cause| Error::Event {
Expand All @@ -136,6 +149,8 @@ impl MessageSender {
cause,
})?;

self.ingress_request_counter.increment(1);

self.dispatcher
.dispatch_ingress_request(req)
.instrument(ingress_span)
Expand All @@ -144,10 +159,7 @@ impl MessageSender {
Ok(())
}

fn generate_events_attributes(
msg: &impl Message,
subscription_id: SubscriptionId,
) -> Vec<Header> {
fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec<Header> {
let mut headers = Vec::with_capacity(6);
headers.push(Header::new("kafka.offset", msg.offset().to_string()));
headers.push(Header::new("kafka.topic", msg.topic()));
Expand All @@ -157,7 +169,7 @@ impl MessageSender {
}
headers.push(Header::new(
"restate.subscription.id".to_string(),
subscription_id.to_string(),
subscription_id,
));

if let Some(key) = msg.key() {
Expand Down Expand Up @@ -187,21 +199,28 @@ impl MessageSender {

#[derive(Clone)]
pub struct ConsumerTask {
task_center: TaskCenter,
client_config: ClientConfig,
topics: Vec<String>,
sender: MessageSender,
}

impl ConsumerTask {
pub fn new(client_config: ClientConfig, topics: Vec<String>, sender: MessageSender) -> Self {
pub fn new(
task_center: TaskCenter,
client_config: ClientConfig,
topics: Vec<String>,
sender: MessageSender,
) -> Self {
Self {
task_center,
client_config,
topics,
sender,
}
}

pub async fn run(mut self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> {
pub async fn run(self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> {
// Create the consumer and subscribe to the topic
let consumer_group_id = self
.client_config
Expand All @@ -213,25 +232,92 @@ impl ConsumerTask {
self.topics, self.client_config
);

let consumer: MessageConsumer = self.client_config.create()?;
let consumer: Arc<MessageConsumer> = Arc::new(self.client_config.create()?);
let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect();
consumer.subscribe(&topics)?;

loop {
let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default();

let result = loop {
tokio::select! {
res = consumer.recv() => {
let msg = res?;
self.sender.send(&consumer_group_id, &msg).await?;
let msg = match res {
Ok(msg) => msg,
Err(e) => break Err(e.into())
};
let topic = msg.topic().to_owned();
let partition = msg.partition();
let offset = msg.offset();

// If we didn't split the queue, let's do it and start the topic partition consumer
if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) {
let topic_partition_consumer = match consumer
.split_partition_queue(&topic, partition) {
Some(q) => q,
None => break Err(Error::TopicPartitionSplit(topic.clone(), partition))
};

let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
topic.clone(), partition,
topic_partition_consumer,
Arc::clone(&consumer),
consumer_group_id.clone()
);

if let Ok(task_id) = self.task_center.spawn_child(TaskKind::Ingress, "partition-queue", None, task) {
e.insert(task_id);
} else {
break Ok(());
}
}

// We got this message, let's send it through
if let Err(e) = self.sender.send(&consumer_group_id, msg).await {
break Err(e)
}

// This method tells rdkafka that we have processed this message,
// so its offset can be safely committed.
// rdkafka periodically commits these offsets asynchronously, with a period configurable
// with auto.commit.interval.ms
consumer.store_offset_from_message(&msg)?;
if let Err(e) = consumer.store_offset(&topic, partition, offset) {
break Err(e.into())
}
}
_ = &mut rx => {
return Ok(());
break Ok(());
}
}
};
for task_id in topic_partition_tasks.into_values() {
self.task_center.cancel_task(task_id);
}
result
}
}

async fn topic_partition_queue_consumption_loop(
sender: MessageSender,
topic: String,
partition: i32,
topic_partition_consumer: StreamPartitionQueue<DefaultConsumerContext>,
consumer: Arc<MessageConsumer>,
consumer_group_id: String,
) -> Result<(), anyhow::Error> {
let mut shutdown = std::pin::pin!(cancellation_watcher());

loop {
tokio::select! {
res = topic_partition_consumer.recv() => {
let msg = res?;
let offset = msg.offset();
sender.send(&consumer_group_id, msg).await?;
consumer.store_offset(&topic, partition, offset)?;
}
_ = &mut shutdown => {
return Ok(())
}
}
}
}
1 change: 1 addition & 0 deletions crates/ingress-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

mod consumer_task;
mod metric_definitions;
mod subscription_controller;

use tokio::sync::mpsc;
Expand Down
21 changes: 21 additions & 0 deletions crates/ingress-kafka/src/metric_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use metrics::{describe_counter, Unit};

pub const KAFKA_INGRESS_REQUESTS: &str = "restate.kafka_ingress.requests.total";

pub(crate) fn describe_metrics() {
describe_counter!(
KAFKA_INGRESS_REQUESTS,
Unit::Count,
"Number of Kafka ingress requests"
);
}
4 changes: 3 additions & 1 deletion crates/ingress-kafka/src/subscription_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::collections::HashSet;

use crate::subscription_controller::task_orchestrator::TaskOrchestrator;
use rdkafka::error::KafkaError;
use restate_core::cancellation_watcher;
use restate_core::{cancellation_watcher, task_center};
use restate_ingress_dispatcher::IngressDispatcher;
use restate_types::config::IngressOptions;
use restate_types::identifiers::SubscriptionId;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct Service {

impl Service {
pub fn new(dispatcher: IngressDispatcher) -> Service {
metric_definitions::describe_metrics();
let (commands_tx, commands_rx) = mpsc::channel(10);

Service {
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Service {

// Create the consumer task
let consumer_task = consumer_task::ConsumerTask::new(
task_center(),
client_config,
vec![topic.to_string()],
MessageSender::new(subscription, self.dispatcher.clone()),
Expand Down

0 comments on commit c17f82b

Please sign in to comment.