Skip to content

Commit

Permalink
feat: support LIMIT in metadata cache (#25658)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj authored Dec 14, 2024
1 parent 486d79d commit df84f9e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 13 deletions.
32 changes: 26 additions & 6 deletions influxdb3_cache/src/meta_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl MetaCache {
pub(crate) fn to_record_batch(
&self,
predicates: &IndexMap<ColumnId, Predicate>,
limit: Option<usize>,
) -> Result<RecordBatch, ArrowError> {
// predicates may not be provided for all columns in the cache, or not be provided in the
// order of columns in the cache. This re-orders them to the correct order, and fills in any
Expand All @@ -237,9 +238,14 @@ impl MetaCache {
.collect();

let expired_time_ns = self.expired_time_ns();
let _ =
self.data
.evaluate_predicates(expired_time_ns, predicates.as_slice(), &mut builders);
// a limit of usize::MAX would never be reached and therefore considered as no limit
let limit = limit.unwrap_or(usize::MAX);
let _ = self.data.evaluate_predicates(
expired_time_ns,
predicates.as_slice(),
limit,
&mut builders,
);

RecordBatch::try_new(
Arc::clone(&self.schema),
Expand Down Expand Up @@ -412,6 +418,7 @@ impl Node {
&self,
expired_time_ns: i64,
predicates: &[Option<&Predicate>],
mut limit: usize,
builders: &mut [StringViewBuilder],
) -> usize {
let mut total_count = 0;
Expand All @@ -420,12 +427,13 @@ impl Node {
.expect("predicates should not be empty");
// if there is a predicate, evaluate it, otherwise, just grab everything from the node:
let values_and_nodes = if let Some(predicate) = predicate {
self.evaluate_predicate(expired_time_ns, predicate)
self.evaluate_predicate(expired_time_ns, predicate, limit)
} else {
self.0
.iter()
.filter(|&(_, (t, _))| (t > &expired_time_ns))
.map(|(v, (_, n))| (v.clone(), n.as_ref()))
.take(limit)
.collect()
};
let (builder, next_builders) = builders
Expand All @@ -435,8 +443,12 @@ impl Node {
// the values to the arrow builders:
for (value, node) in values_and_nodes {
if let Some(node) = node {
let count =
node.evaluate_predicates(expired_time_ns, next_predicates, next_builders);
let count = node.evaluate_predicates(
expired_time_ns,
next_predicates,
limit,
next_builders,
);
if count > 0 {
// we are not on a terminal node in the cache, so create a block, as this value
// repeated `count` times, i.e., depending on how many values come out of
Expand All @@ -449,6 +461,11 @@ impl Node {
}
total_count += count;
}
if let Some(new_limit) = limit.checked_sub(count) {
limit = new_limit;
} else {
break;
}
} else {
builder.append_value(value.0);
total_count += 1;
Expand All @@ -463,6 +480,7 @@ impl Node {
&self,
expired_time_ns: i64,
predicate: &Predicate,
limit: usize,
) -> Vec<(Value, Option<&Node>)> {
match &predicate {
Predicate::In(in_list) => in_list
Expand All @@ -472,12 +490,14 @@ impl Node {
(t > &expired_time_ns).then(|| (v.clone(), n.as_ref()))
})
})
.take(limit)
.collect(),
Predicate::NotIn(not_in_set) => self
.0
.iter()
.filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v))
.map(|(v, (_, n))| (v.clone(), n.as_ref()))
.take(limit)
.collect(),
}
}
Expand Down
93 changes: 88 additions & 5 deletions influxdb3_cache/src/meta_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ mod tests {

for tc in test_cases {
let records = cache
.to_record_batch(&tc.predicates)
.to_record_batch(&tc.predicates, None)
.expect("get record batches");
println!("{}", tc.desc);
assert_batches_sorted_eq!(tc.expected, &[records]);
Expand Down Expand Up @@ -241,7 +241,7 @@ mod tests {
// check the cache before prune:
// NOTE: this does not include entries that have surpassed the max_age of the cache, though,
// there are still more than the cache's max cardinality, as it has not yet been pruned.
let records = cache.to_record_batch(&Default::default()).unwrap();
let records = cache.to_record_batch(&Default::default(), None).unwrap();
assert_batches_sorted_eq!(
[
"+-----------+------+",
Expand All @@ -267,7 +267,7 @@ mod tests {
&[records]
);
cache.prune();
let records = cache.to_record_batch(&Default::default()).unwrap();
let records = cache.to_record_batch(&Default::default(), None).unwrap();
assert_batches_sorted_eq!(
[
"+-----------+------+",
Expand All @@ -288,6 +288,89 @@ mod tests {
);
}

#[test]
fn meta_cache_limit() {
let writer = TestWriter::new();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let rows = writer.write_lp_to_rows(
"\
cpu,region=us-east,host=a usage=100\n\
cpu,region=us-east,host=b usage=100\n\
cpu,region=us-west,host=c usage=100\n\
cpu,region=us-west,host=d usage=100\n\
cpu,region=ca-east,host=e usage=100\n\
cpu,region=ca-east,host=f usage=100\n\
cpu,region=ca-cent,host=g usage=100\n\
cpu,region=ca-cent,host=h usage=100\n\
cpu,region=eu-east,host=i usage=100\n\
cpu,region=eu-east,host=j usage=100\n\
cpu,region=eu-cent,host=k usage=100\n\
cpu,region=eu-cent,host=l usage=100\n\
",
0,
);
let table_def = writer.db_schema().table_definition("cpu").unwrap();
let column_ids: Vec<ColumnId> = ["region", "host"]
.into_iter()
.map(|name| table_def.column_name_to_id_unchecked(name))
.collect();
let mut cache = MetaCache::new(
time_provider,
CreateMetaCacheArgs {
table_def,
max_cardinality: MaxCardinality::default(),
max_age: MaxAge::default(),
column_ids,
},
)
.unwrap();

for row in rows {
cache.push(&row);
}

// no limit produces all records in the cache:
let batches = cache.to_record_batch(&Default::default(), None).unwrap();
assert_batches_eq!(
[
"+---------+------+",
"| region | host |",
"+---------+------+",
"| ca-cent | g |",
"| ca-cent | h |",
"| ca-east | e |",
"| ca-east | f |",
"| eu-cent | k |",
"| eu-cent | l |",
"| eu-east | i |",
"| eu-east | j |",
"| us-east | a |",
"| us-east | b |",
"| us-west | c |",
"| us-west | d |",
"+---------+------+",
],
&[batches]
);

// applying a limit only returns that number of records from the cache:
let batches = cache.to_record_batch(&Default::default(), Some(5)).unwrap();
assert_batches_eq!(
[
"+---------+------+",
"| region | host |",
"+---------+------+",
"| ca-cent | g |",
"| ca-cent | h |",
"| ca-east | e |",
"| ca-east | f |",
"| eu-cent | k |",
"+---------+------+",
],
&[batches]
);
}

/// This test sets up a [`MetaCacheProvider`], creates a [`MetaCache`] using the `region` and
/// `host` column, and then writes several different unique combinations of values into it.
/// It then sets up a DataFusion [`SessionContext`], registers our [`MetaCacheFunction`] as a
Expand Down Expand Up @@ -663,7 +746,7 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -679,7 +762,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand Down
11 changes: 9 additions & 2 deletions influxdb3_cache/src/meta_cache/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl TableProvider for MetaCacheFunctionProvider {
ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let read = self.provider.cache_map.read();
let (batches, predicates) = if let Some(cache) = read
Expand All @@ -80,7 +80,7 @@ impl TableProvider for MetaCacheFunctionProvider {
let predicates = convert_filter_exprs(&self.table_def, self.schema(), filters)?;
(
cache
.to_record_batch(&predicates)
.to_record_batch(&predicates, limit)
.map(|batch| vec![batch])?,
(!predicates.is_empty()).then_some(predicates),
)
Expand All @@ -93,6 +93,7 @@ impl TableProvider for MetaCacheFunctionProvider {
&[batches],
self.schema(),
projection.cloned(),
limit,
)?;

let show_sizes = ctx.config_options().explain.show_sizes;
Expand Down Expand Up @@ -280,6 +281,7 @@ struct MetaCacheExec {
inner: MemoryExec,
table_def: Arc<TableDefinition>,
predicates: Option<IndexMap<ColumnId, Predicate>>,
limit: Option<usize>,
}

impl MetaCacheExec {
Expand All @@ -289,11 +291,13 @@ impl MetaCacheExec {
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Self> {
Ok(Self {
inner: MemoryExec::try_new(partitions, schema, projection)?,
predicates,
table_def,
limit,
})
}

Expand All @@ -310,6 +314,9 @@ impl DisplayAs for MetaCacheExec {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MetaCacheExec:")?;
if let Some(limit) = self.limit {
write!(f, " limit={limit}")?;
}
if let Some(predicates) = self.predicates.as_ref() {
write!(f, " predicates=[")?;
let mut p_iter = predicates.iter();
Expand Down

0 comments on commit df84f9e

Please sign in to comment.