Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup (aka DISTINCT) support on PhysicalPlan #1914

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/core/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::{DBError, PlanError};
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_expr::check::SchemaView;
use spacetimedb_expr::statement::compile_sql_stmt;
use spacetimedb_expr::ty::TyCtx;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::db::error::RelationError;
use spacetimedb_lib::identity::AuthCtx;
Expand Down Expand Up @@ -952,7 +953,7 @@ pub(crate) fn compile_to_ast<T: TableSchemaView>(
) -> Result<Vec<SqlAst>, DBError> {
// NOTE: The following ensures compliance with the 1.0 sql api.
// Come 1.0, it will have replaced the current compilation stack.
compile_sql_stmt(sql_text, &SchemaViewer::new(db, tx, auth))?;
compile_sql_stmt(&mut TyCtx::default(), sql_text, &SchemaViewer::new(db, tx, auth))?;

let dialect = PostgreSqlDialect {};
let ast = Parser::parse_sql(&dialect, sql_text).map_err(|error| DBError::SqlParser {
Expand Down
27 changes: 27 additions & 0 deletions crates/execution/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::ops::{Bound, RangeBounds};

use spacetimedb_lib::{AlgebraicValue, ProductValue};
use spacetimedb_primitives::{IndexId, TableId};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::{
blob_store::BlobStore,
btree_index::{BTreeIndex, BTreeIndexRangeIter},
static_assert_size,
table::{IndexScanIter, RowRef, Table, TableScanIter},
};
use std::collections::HashSet;

/// A row from a base table in the form of a pointer or product value
#[derive(Clone)]
Expand Down Expand Up @@ -238,6 +240,8 @@ pub enum Iter<'a> {
UniqueIxJoin(LeftDeepJoin<UniqueIndexJoin<'a>>),
/// A tuple-at-a-time filter iterator
Filter(Filter<'a>),
/// Deduplication
Dedup(Dedup<'a>),
}

impl<'a> Iterator for Iter<'a> {
Expand Down Expand Up @@ -269,6 +273,10 @@ impl<'a> Iterator for Iter<'a> {
// Filter is a passthru
iter.next()
}
Self::Dedup(iter) => {
// Dedup is a passthru
iter.next()
}
Self::NLJoin(iter) => {
iter.next().map(|t| {
match t {
Expand Down Expand Up @@ -557,6 +565,25 @@ impl<'a> Iterator for Filter<'a> {
}
}

/// A tuple at a time deduplication iterator
pub struct Dedup<'a> {
/// The input iterator
input: Box<Iter<'a>>,
/// The set of seen row ids
seen: HashSet<RowPointer>,
}

impl<'a> Iterator for Dedup<'a> {
type Item = Tuple<'a>;

fn next(&mut self) -> Option<Self::Item> {
self.input.find(|tuple| {
let ptr = tuple.expect_row().expect_ptr();
self.seen.insert(ptr.pointer())
})
}
}

/// An opcode for a stack-based expression evaluator
#[derive(Clone, Copy)]
pub enum OpCode {
Expand Down
2 changes: 1 addition & 1 deletion crates/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct Project {
///
/// Relational operators take a single input paramter.
/// Let variables explicitly destructure the input row.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Let {
/// The variable definitions for this let expression
pub vars: Vec<(Symbol, Expr)>,
Expand Down
16 changes: 8 additions & 8 deletions crates/expr/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,20 @@ impl TypeChecker for SqlChecker {
}
}

fn parse_and_type_sql(sql: &str, tx: &impl SchemaView) -> TypingResult<Statement> {
fn parse_and_type_sql(ctx: &mut TyCtx, sql: &str, tx: &impl SchemaView) -> TypingResult<Statement> {
match parse_sql(sql)? {
SqlAst::Insert(insert) => Ok(Statement::Insert(type_insert(&mut TyCtx::default(), insert, tx)?)),
SqlAst::Delete(delete) => Ok(Statement::Delete(type_delete(&mut TyCtx::default(), delete, tx)?)),
SqlAst::Update(update) => Ok(Statement::Update(type_update(&mut TyCtx::default(), update, tx)?)),
SqlAst::Query(ast) => Ok(Statement::Select(SqlChecker::type_ast(&mut TyCtx::default(), ast, tx)?)),
SqlAst::Set(set) => Ok(Statement::Set(type_set(&TyCtx::default(), set)?)),
SqlAst::Insert(insert) => Ok(Statement::Insert(type_insert(ctx, insert, tx)?)),
SqlAst::Delete(delete) => Ok(Statement::Delete(type_delete(ctx, delete, tx)?)),
SqlAst::Update(update) => Ok(Statement::Update(type_update(ctx, update, tx)?)),
SqlAst::Query(ast) => Ok(Statement::Select(SqlChecker::type_ast(ctx, ast, tx)?)),
SqlAst::Set(set) => Ok(Statement::Set(type_set(ctx, set)?)),
SqlAst::Show(show) => Ok(Statement::Show(type_show(show)?)),
}
}

/// Parse and type check a *general* query into a [StatementCtx].
pub fn compile_sql_stmt<'a>(sql: &'a str, tx: &impl SchemaView) -> TypingResult<StatementCtx<'a>> {
let statement = parse_and_type_sql(sql, tx)?;
pub fn compile_sql_stmt<'a>(ctx: &mut TyCtx, sql: &'a str, tx: &impl SchemaView) -> TypingResult<StatementCtx<'a>> {
let statement = parse_and_type_sql(ctx, sql, tx)?;
Ok(StatementCtx {
statement,
sql,
Expand Down
29 changes: 23 additions & 6 deletions crates/physical-plan/src/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,19 @@ fn compile_cross_joins(ctx: &TyCtx, joins: Vec<RelExpr>) -> PhysicalPlan {
.unwrap()
}

fn compile_dedup(ctx: &TyCtx, query: RelExpr) -> PhysicalPlan {
PhysicalPlan::Dedup(Box::new(compile_rel_expr(ctx, query)))
}

fn compile_rel_expr(ctx: &TyCtx, ast: RelExpr) -> PhysicalPlan {
match ast {
RelExpr::RelVar(table, _ty) => PhysicalPlan::TableScan(table),
RelExpr::Select(select) => compile_filter(ctx, *select),
RelExpr::Proj(proj) => compile_project(ctx, *proj),
RelExpr::Dedup(query) => compile_dedup(ctx, *query),
RelExpr::Join(joins, _) => compile_cross_joins(ctx, joins.into_vec()),
RelExpr::Union(_, _) | RelExpr::Minus(_, _) | RelExpr::Dedup(_) => {
unreachable!("DISTINCT is not implemented")
RelExpr::Union(_, _) | RelExpr::Minus(_, _) => {
unreachable!("Union|Minus is not implemented")
}
}
}
Expand Down Expand Up @@ -147,10 +152,11 @@ mod tests {
Ok((expr, ctx))
}

fn compile_sql_stmt_test(sql: &str) -> ResultTest<StatementCtx> {
fn compile_sql_stmt_test(sql: &str) -> ResultTest<(StatementCtx, TyCtx)> {
let tx = SchemaViewer(module_def());
let statement = compile_sql_stmt(sql, &tx)?;
Ok(statement)
let mut ctx = TyCtx::default();
let statement = compile_sql_stmt(&mut ctx, sql, &tx)?;
Ok((statement, ctx))
}

impl PhysicalPlan {
Expand Down Expand Up @@ -184,7 +190,7 @@ mod tests {
let (ast, ctx) = compile_sql_sub_test("SELECT * FROM t")?;
assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::TableScan(_)));

let ast = compile_sql_stmt_test("SELECT u32 FROM t")?;
let (ast, ctx) = compile_sql_stmt_test("SELECT u32 FROM t")?;
assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Project(..)));

Ok(())
Expand Down Expand Up @@ -237,4 +243,15 @@ mod tests {

Ok(())
}

#[test]
fn test_distinct() -> ResultTest<()> {
let (ast, ctx) = compile_sql_stmt_test("SELECT DISTINCT * FROM t")?;

let ast = compile(&ctx, ast).plan;

assert!(matches!(ast, PhysicalPlan::Dedup(_)));

Ok(())
}
}
2 changes: 2 additions & 0 deletions crates/physical-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub enum PhysicalPlan {
Filter(Box<PhysicalPlan>, PhysicalExpr),
/// A tuple-at-a-time projection
Project(Box<PhysicalPlan>, PhysicalExpr),
/// Deduplicate a relation
Dedup(Box<PhysicalPlan>),
}

/// Fetch and return row ids from a btree index
Expand Down
Loading