Skip to content

Commit

Permalink
Let MultiplexedInvokerStatusReader share state (#1962)
Browse files Browse the repository at this point in the history
The MultiplexedInvokerStatusReader now shares the ChannelStatusReader via
an Arc<Mutex<>>.

This fixes #1961.
  • Loading branch information
tillrohrmann authored Sep 16, 2024
1 parent 1ad1a0d commit d4568a9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ humantime = { workspace = true }
itertools = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
schemars = { workspace = true, optional = true }
Expand Down
24 changes: 18 additions & 6 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -242,14 +243,16 @@ impl PartitionProcessorHandle {
}
}

type ChannelStatusReaderList = Vec<(RangeInclusive<PartitionKey>, ChannelStatusReader)>;

#[derive(Debug, Clone, Default)]
pub struct MultiplexedInvokerStatusReader {
readers: Vec<(RangeInclusive<PartitionKey>, ChannelStatusReader)>,
readers: Arc<parking_lot::RwLock<ChannelStatusReaderList>>,
}

impl MultiplexedInvokerStatusReader {
fn push(&mut self, key_range: RangeInclusive<PartitionKey>, reader: ChannelStatusReader) {
self.readers.push((key_range, reader));
self.readers.write().push((key_range, reader));
}
}

Expand All @@ -258,15 +261,24 @@ impl StatusHandle for MultiplexedInvokerStatusReader {
std::iter::Flatten<std::vec::IntoIter<<ChannelStatusReader as StatusHandle>::Iterator>>;

async fn read_status(&self, keys: RangeInclusive<PartitionKey>) -> Self::Iterator {
let mut iterators = vec![];
let mut overlapping_partitions = Vec::new();

for (range, reader) in self.readers.iter() {
// first clone the readers while holding the lock, then release the lock before reading the
// status to avoid holding the lock across await points
for (range, reader) in self.readers.read().iter() {
if keys.start() <= range.end() && keys.end() >= range.start() {
// if this partition is actually overlapping with the search range
iterators.push(reader.read_status(keys.clone()).await)
overlapping_partitions.push(reader.clone())
}
}
iterators.into_iter().flatten()

let mut result = Vec::with_capacity(overlapping_partitions.len());

for reader in overlapping_partitions {
result.push(reader.read_status(keys.clone()).await);
}

result.into_iter().flatten()
}
}

Expand Down

0 comments on commit d4568a9

Please sign in to comment.