Skip to content

Commit

Permalink
Dedup (aka DISTINCT) support on PhysicalPlan
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Oct 28, 2024
1 parent 5de7d6b commit 6078b86
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
30 changes: 28 additions & 2 deletions crates/execution/src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::borrow::Cow;

use spacetimedb_lib::{AlgebraicValue, ProductValue};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::{
blob_store::BlobStore,
btree_index::BTreeIndex,
static_assert_size,
table::{IndexScanIter, RowRef, Table, TableScanIter},
};
use std::borrow::Cow;
use std::collections::HashSet;

/// A row from a base table in the form of a pointer or product value
#[derive(Clone)]
Expand Down Expand Up @@ -94,6 +95,8 @@ pub enum Iter<'a> {
UniqueMultiColIxSemiRhs(UniqueIxSemiRhs<'a, MultiColProjEvaluator<'a>>),
/// A tuple at a time filter
Filter(Filter<'a>),
/// Deduplication
Dedup(Dedup<'a>),
}

impl<'a> Iterator for Iter<'a> {
Expand Down Expand Up @@ -213,6 +216,10 @@ impl<'a> Iterator for Iter<'a> {
// Filter is a passthru
iter.next()
}
Self::Dedup(iter) => {
// Dedup is a passthru
iter.next()
}
}
}
}
Expand Down Expand Up @@ -368,6 +375,25 @@ impl<'a> Iterator for Filter<'a> {
}
}

/// A tuple at a time deduplication iterator
pub struct Dedup<'a> {
/// The input iterator
input: Box<Iter<'a>>,
/// The set of seen row ids
seen: HashSet<RowPointer>,
}

impl<'a> Iterator for Dedup<'a> {
type Item = Tuple<'a>;

fn next(&mut self) -> Option<Self::Item> {
self.input.find(|tuple| {
let ptr = tuple.expect_row().expect_ptr();
self.seen.insert(ptr.pointer())
})
}
}

/// An opcode for a tuple projection operation
#[derive(Clone, Copy)]
pub enum ProjOpCode {
Expand Down
14 changes: 12 additions & 2 deletions crates/physical-plan/src/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ fn compile_rel_expr(ast: RelExpr) -> PhysicalPlan {
RelExpr::Select(select) => compile_filter(*select),
RelExpr::Proj(proj) => compile_project(*proj),
RelExpr::Join(joins, ty) => compile_cross_joins(&joins, ty),
RelExpr::Union(_, _) | RelExpr::Minus(_, _) | RelExpr::Dedup(_) => {
unreachable!("DISTINCT is not implemented")
RelExpr::Dedup(query) => PhysicalPlan::Dedup(Box::new(compile_rel_expr(*query))),
RelExpr::Union(_, _) | RelExpr::Minus(_, _) => {
unreachable!("Union|Minus is not implemented")
}
}
}
Expand Down Expand Up @@ -215,4 +216,13 @@ mod tests {

Ok(())
}

#[test]
fn test_distinct() -> ResultTest<()> {
let ast = compile(compile_sql_stmt_test("SELECT DISTINCT * FROM t")?).plan;

assert!(matches!(ast, PhysicalPlan::Dedup(_)));

Ok(())
}
}
2 changes: 2 additions & 0 deletions crates/physical-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum PhysicalPlan {
Filter(Filter),
/// Transform an input relation row by row
Project(Project),
/// Deduplicate a relation
Dedup(Box<PhysicalPlan>),
}

impl PhysicalPlan {
Expand Down

0 comments on commit 6078b86

Please sign in to comment.