diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index cd196f2c21b..646eba9798d 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -1,12 +1,13 @@ -use std::borrow::Cow; - use spacetimedb_lib::{AlgebraicValue, ProductValue}; +use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::{ blob_store::BlobStore, btree_index::BTreeIndex, static_assert_size, table::{IndexScanIter, RowRef, Table, TableScanIter}, }; +use std::borrow::Cow; +use std::collections::HashSet; /// A row from a base table in the form of a pointer or product value #[derive(Clone)] @@ -94,6 +95,8 @@ pub enum Iter<'a> { UniqueMultiColIxSemiRhs(UniqueIxSemiRhs<'a, MultiColProjEvaluator<'a>>), /// A tuple at a time filter Filter(Filter<'a>), + /// Deduplication + Dedup(Dedup<'a>), } impl<'a> Iterator for Iter<'a> { @@ -213,6 +216,10 @@ impl<'a> Iterator for Iter<'a> { // Filter is a passthru iter.next() } + Self::Dedup(iter) => { + // Dedup is a passthru + iter.next() + } } } } @@ -368,6 +375,25 @@ impl<'a> Iterator for Filter<'a> { } } +/// A tuple at a time deduplication iterator +pub struct Dedup<'a> { + /// The input iterator + input: Box>, + /// The set of seen row ids + seen: HashSet, +} + +impl<'a> Iterator for Dedup<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.input.find(|tuple| { + let ptr = tuple.expect_row().expect_ptr(); + self.seen.insert(ptr.pointer()) + }) + } +} + /// An opcode for a tuple projection operation #[derive(Clone, Copy)] pub enum ProjOpCode { diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index a504507b86c..19e06d493d7 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -88,8 +88,9 @@ fn compile_rel_expr(ast: RelExpr) -> PhysicalPlan { RelExpr::Select(select) => compile_filter(*select), RelExpr::Proj(proj) => compile_project(*proj), RelExpr::Join(joins, ty) => compile_cross_joins(&joins, ty), - RelExpr::Union(_, _) | RelExpr::Minus(_, _) | RelExpr::Dedup(_) => { - unreachable!("DISTINCT is not implemented") + RelExpr::Dedup(query) => PhysicalPlan::Dedup(Box::new(compile_rel_expr(*query))), + RelExpr::Union(_, _) | RelExpr::Minus(_, _) => { + unreachable!("Union|Minus is not implemented") } } } @@ -215,4 +216,13 @@ mod tests { Ok(()) } + + #[test] + fn test_distinct() -> ResultTest<()> { + let ast = compile(compile_sql_stmt_test("SELECT DISTINCT * FROM t")?).plan; + + assert!(matches!(ast, PhysicalPlan::Dedup(_))); + + Ok(()) + } } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index e0cd9a0a32f..7ff183cc626 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -24,6 +24,8 @@ pub enum PhysicalPlan { Filter(Filter), /// Transform an input relation row by row Project(Project), + /// Deduplicate a relation + Dedup(Box), } impl PhysicalPlan {