diff --git a/crates/core/src/sql/ast.rs b/crates/core/src/sql/ast.rs index 15ab7443faa..eff695f8708 100644 --- a/crates/core/src/sql/ast.rs +++ b/crates/core/src/sql/ast.rs @@ -3,6 +3,7 @@ use crate::error::{DBError, PlanError}; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_expr::check::SchemaView; use spacetimedb_expr::statement::compile_sql_stmt; +use spacetimedb_expr::ty::TyCtx; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::error::RelationError; use spacetimedb_lib::identity::AuthCtx; @@ -952,7 +953,7 @@ pub(crate) fn compile_to_ast( ) -> Result, DBError> { // NOTE: The following ensures compliance with the 1.0 sql api. // Come 1.0, it will have replaced the current compilation stack. - compile_sql_stmt(sql_text, &SchemaViewer::new(db, tx, auth))?; + compile_sql_stmt(&mut TyCtx::default(), sql_text, &SchemaViewer::new(db, tx, auth))?; let dialect = PostgreSqlDialect {}; let ast = Parser::parse_sql(&dialect, sql_text).map_err(|error| DBError::SqlParser { diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index 8db3e2c9c7c..3c80922f19c 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -2,12 +2,14 @@ use std::ops::{Bound, RangeBounds}; use spacetimedb_lib::{AlgebraicValue, ProductValue}; use spacetimedb_primitives::{IndexId, TableId}; +use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::{ blob_store::BlobStore, btree_index::{BTreeIndex, BTreeIndexRangeIter}, static_assert_size, table::{IndexScanIter, RowRef, Table, TableScanIter}, }; +use std::collections::HashSet; /// A row from a base table in the form of a pointer or product value #[derive(Clone)] @@ -238,6 +240,8 @@ pub enum Iter<'a> { UniqueIxJoin(LeftDeepJoin>), /// A tuple-at-a-time filter iterator Filter(Filter<'a>), + /// Deduplication + Dedup(Dedup<'a>), } impl<'a> Iterator for Iter<'a> { @@ -269,6 +273,10 @@ impl<'a> Iterator for Iter<'a> { // Filter is a passthru iter.next() } + Self::Dedup(iter) => { + // Dedup is a passthru + iter.next() + } Self::NLJoin(iter) => { iter.next().map(|t| { match t { @@ -557,6 +565,25 @@ impl<'a> Iterator for Filter<'a> { } } +/// A tuple at a time deduplication iterator +pub struct Dedup<'a> { + /// The input iterator + input: Box>, + /// The set of seen row ids + seen: HashSet, +} + +impl<'a> Iterator for Dedup<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.input.find(|tuple| { + let ptr = tuple.expect_row().expect_ptr(); + self.seen.insert(ptr.pointer()) + }) + } +} + /// An opcode for a stack-based expression evaluator #[derive(Clone, Copy)] pub enum OpCode { diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index a799953d684..ca821f71d77 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -10,7 +10,7 @@ use spacetimedb_sql_parser::ast::BinOp; use super::ty::{InvalidTypeId, Symbol, TyCtx, TyId, Type, TypeWithCtx}; /// A logical relational expression -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum RelExpr { /// A base table RelVar(Arc, TyId), @@ -65,7 +65,7 @@ impl RelExpr { } /// A relational select operation or filter -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Select { /// The input relation pub input: RelExpr, @@ -74,7 +74,7 @@ pub struct Select { } /// A relational project operation or map -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Project { /// The input relation pub input: RelExpr, @@ -86,7 +86,7 @@ pub struct Project { /// /// Relational operators take a single input paramter. /// Let variables explicitly destructure the input row. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Let { /// The variable definitions for this let expression pub vars: Vec<(Symbol, Expr)>, diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index 05e631d0945..a49d51331a9 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -348,20 +348,20 @@ impl TypeChecker for SqlChecker { } } -fn parse_and_type_sql(sql: &str, tx: &impl SchemaView) -> TypingResult { +fn parse_and_type_sql(ctx: &mut TyCtx, sql: &str, tx: &impl SchemaView) -> TypingResult { match parse_sql(sql)? { - SqlAst::Insert(insert) => Ok(Statement::Insert(type_insert(&mut TyCtx::default(), insert, tx)?)), - SqlAst::Delete(delete) => Ok(Statement::Delete(type_delete(&mut TyCtx::default(), delete, tx)?)), - SqlAst::Update(update) => Ok(Statement::Update(type_update(&mut TyCtx::default(), update, tx)?)), - SqlAst::Query(ast) => Ok(Statement::Select(SqlChecker::type_ast(&mut TyCtx::default(), ast, tx)?)), - SqlAst::Set(set) => Ok(Statement::Set(type_set(&TyCtx::default(), set)?)), + SqlAst::Insert(insert) => Ok(Statement::Insert(type_insert(ctx, insert, tx)?)), + SqlAst::Delete(delete) => Ok(Statement::Delete(type_delete(ctx, delete, tx)?)), + SqlAst::Update(update) => Ok(Statement::Update(type_update(ctx, update, tx)?)), + SqlAst::Query(ast) => Ok(Statement::Select(SqlChecker::type_ast(ctx, ast, tx)?)), + SqlAst::Set(set) => Ok(Statement::Set(type_set(ctx, set)?)), SqlAst::Show(show) => Ok(Statement::Show(type_show(show)?)), } } /// Parse and type check a *general* query into a [StatementCtx]. -pub fn compile_sql_stmt<'a>(sql: &'a str, tx: &impl SchemaView) -> TypingResult> { - let statement = parse_and_type_sql(sql, tx)?; +pub fn compile_sql_stmt<'a>(ctx: &mut TyCtx, sql: &'a str, tx: &impl SchemaView) -> TypingResult> { + let statement = parse_and_type_sql(ctx, sql, tx)?; Ok(StatementCtx { statement, sql, diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index 4420c5d3595..27e59d95e82 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -74,14 +74,19 @@ fn compile_cross_joins(ctx: &TyCtx, joins: Vec) -> PhysicalPlan { .unwrap() } +fn compile_dedup(ctx: &TyCtx, query: RelExpr) -> PhysicalPlan { + PhysicalPlan::Dedup(Box::new(compile_rel_expr(ctx, query))) +} + fn compile_rel_expr(ctx: &TyCtx, ast: RelExpr) -> PhysicalPlan { match ast { RelExpr::RelVar(table, _ty) => PhysicalPlan::TableScan(table), RelExpr::Select(select) => compile_filter(ctx, *select), RelExpr::Proj(proj) => compile_project(ctx, *proj), + RelExpr::Dedup(query) => compile_dedup(ctx, *query), RelExpr::Join(joins, _) => compile_cross_joins(ctx, joins.into_vec()), - RelExpr::Union(_, _) | RelExpr::Minus(_, _) | RelExpr::Dedup(_) => { - unreachable!("DISTINCT is not implemented") + RelExpr::Union(_, _) | RelExpr::Minus(_, _) => { + unreachable!("Union|Minus is not implemented") } } } @@ -147,10 +152,11 @@ mod tests { Ok((expr, ctx)) } - fn compile_sql_stmt_test(sql: &str) -> ResultTest { + fn compile_sql_stmt_test(sql: &str) -> ResultTest<(StatementCtx, TyCtx)> { let tx = SchemaViewer(module_def()); - let statement = compile_sql_stmt(sql, &tx)?; - Ok(statement) + let mut ctx = TyCtx::default(); + let statement = compile_sql_stmt(&mut ctx, sql, &tx)?; + Ok((statement, ctx)) } impl PhysicalPlan { @@ -184,7 +190,7 @@ mod tests { let (ast, ctx) = compile_sql_sub_test("SELECT * FROM t")?; assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::TableScan(_))); - let ast = compile_sql_stmt_test("SELECT u32 FROM t")?; + let (ast, ctx) = compile_sql_stmt_test("SELECT u32 FROM t")?; assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Project(..))); Ok(()) @@ -237,4 +243,15 @@ mod tests { Ok(()) } + + #[test] + fn test_distinct() -> ResultTest<()> { + let (ast, ctx) = compile_sql_stmt_test("SELECT DISTINCT * FROM t")?; + + let ast = compile(&ctx, ast).plan; + + assert!(matches!(ast, PhysicalPlan::Dedup(_))); + + Ok(()) + } } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 38098ed10aa..4a6cf0e2bfc 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -5,7 +5,6 @@ use spacetimedb_lib::AlgebraicValue; use spacetimedb_primitives::{ColId, IndexId}; use spacetimedb_schema::schema::TableSchema; use spacetimedb_sql_parser::ast::BinOp; -use std::{ops::Bound, sync::Arc}; /// A physical plan is a concrete evaluation strategy. /// As such, we can reason about its energy consumption. @@ -34,32 +33,8 @@ pub enum PhysicalPlan { Filter(Box, PhysicalExpr), /// A tuple-at-a-time projection Project(Box, PhysicalExpr), -} - -impl PhysicalPlan { - pub fn as_project(&self) -> Option<&Project> { - if let PhysicalPlan::Project(p) = self { - Some(p) - } else { - None - } - } - - pub fn as_filter(&self) -> Option<&Filter> { - if let PhysicalPlan::Filter(p) = self { - Some(p) - } else { - None - } - } - - pub fn as_cross(&self) -> Option<&CrossJoin> { - if let PhysicalPlan::CrossJoin(p) = self { - Some(p) - } else { - None - } - } + /// Deduplicate a relation + Dedup(Box), } /// Fetch and return row ids from a btree index @@ -169,10 +144,3 @@ pub struct PhysicalCtx<'a> { pub sql: &'a str, pub source: StatementSource, } - -/// A physical context for the result of a query compilation. -pub struct PhysicalCtx<'a> { - pub plan: PhysicalPlan, - pub sql: &'a str, - pub source: StatementSource, -}