Skip to content

Commit

Permalink
Dedup (aka DISTINCT) support on PhysicalPlan
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Nov 11, 2024
1 parent 64c9865 commit 088ceb4
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 53 deletions.
3 changes: 2 additions & 1 deletion crates/core/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::{DBError, PlanError};
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_expr::check::SchemaView;
use spacetimedb_expr::statement::compile_sql_stmt;
use spacetimedb_expr::ty::TyCtx;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::db::error::RelationError;
use spacetimedb_lib::identity::AuthCtx;
Expand Down Expand Up @@ -952,7 +953,7 @@ pub(crate) fn compile_to_ast<T: TableSchemaView>(
) -> Result<Vec<SqlAst>, DBError> {
// NOTE: The following ensures compliance with the 1.0 sql api.
// Come 1.0, it will have replaced the current compilation stack.
compile_sql_stmt(sql_text, &SchemaViewer::new(db, tx, auth))?;
compile_sql_stmt(&mut TyCtx::default(), sql_text, &SchemaViewer::new(db, tx, auth))?;

let dialect = PostgreSqlDialect {};
let ast = Parser::parse_sql(&dialect, sql_text).map_err(|error| DBError::SqlParser {
Expand Down
27 changes: 27 additions & 0 deletions crates/execution/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::ops::{Bound, RangeBounds};

use spacetimedb_lib::{AlgebraicValue, ProductValue};
use spacetimedb_primitives::{IndexId, TableId};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::{
blob_store::BlobStore,
btree_index::{BTreeIndex, BTreeIndexRangeIter},
static_assert_size,
table::{IndexScanIter, RowRef, Table, TableScanIter},
};
use std::collections::HashSet;

/// A row from a base table in the form of a pointer or product value
#[derive(Clone)]
Expand Down Expand Up @@ -238,6 +240,8 @@ pub enum Iter<'a> {
UniqueIxJoin(LeftDeepJoin<UniqueIndexJoin<'a>>),
/// A tuple-at-a-time filter iterator
Filter(Filter<'a>),
/// Deduplication
Dedup(Dedup<'a>),
}

impl<'a> Iterator for Iter<'a> {
Expand Down Expand Up @@ -269,6 +273,10 @@ impl<'a> Iterator for Iter<'a> {
// Filter is a passthru
iter.next()
}
Self::Dedup(iter) => {
// Dedup is a passthru
iter.next()
}
Self::NLJoin(iter) => {
iter.next().map(|t| {
match t {
Expand Down Expand Up @@ -557,6 +565,25 @@ impl<'a> Iterator for Filter<'a> {
}
}

/// A tuple at a time deduplication iterator
pub struct Dedup<'a> {
/// The input iterator
input: Box<Iter<'a>>,
/// The set of seen row ids
seen: HashSet<RowPointer>,
}

impl<'a> Iterator for Dedup<'a> {
type Item = Tuple<'a>;

fn next(&mut self) -> Option<Self::Item> {
self.input.find(|tuple| {
let ptr = tuple.expect_row().expect_ptr();
self.seen.insert(ptr.pointer())
})
}
}

/// An opcode for a stack-based expression evaluator
#[derive(Clone, Copy)]
pub enum OpCode {
Expand Down
8 changes: 4 additions & 4 deletions crates/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use spacetimedb_sql_parser::ast::BinOp;
use super::ty::{InvalidTypeId, Symbol, TyCtx, TyId, Type, TypeWithCtx};

/// A logical relational expression
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum RelExpr {
/// A base table
RelVar(Arc<TableSchema>, TyId),
Expand Down Expand Up @@ -65,7 +65,7 @@ impl RelExpr {
}

/// A relational select operation or filter
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Select {
/// The input relation
pub input: RelExpr,
Expand All @@ -74,7 +74,7 @@ pub struct Select {
}

/// A relational project operation or map
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Project {
/// The input relation
pub input: RelExpr,
Expand All @@ -86,7 +86,7 @@ pub struct Project {
///
/// Relational operators take a single input paramter.
/// Let variables explicitly destructure the input row.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Let {
/// The variable definitions for this let expression
pub vars: Vec<(Symbol, Expr)>,
Expand Down
16 changes: 8 additions & 8 deletions crates/expr/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,20 @@ impl TypeChecker for SqlChecker {
}
}

fn parse_and_type_sql(sql: &str, tx: &impl SchemaView) -> TypingResult<Statement> {
fn parse_and_type_sql(ctx: &mut TyCtx, sql: &str, tx: &impl SchemaView) -> TypingResult<Statement> {
match parse_sql(sql)? {
SqlAst::Insert(insert) => Ok(Statement::Insert(type_insert(&mut TyCtx::default(), insert, tx)?)),
SqlAst::Delete(delete) => Ok(Statement::Delete(type_delete(&mut TyCtx::default(), delete, tx)?)),
SqlAst::Update(update) => Ok(Statement::Update(type_update(&mut TyCtx::default(), update, tx)?)),
SqlAst::Query(ast) => Ok(Statement::Select(SqlChecker::type_ast(&mut TyCtx::default(), ast, tx)?)),
SqlAst::Set(set) => Ok(Statement::Set(type_set(&TyCtx::default(), set)?)),
SqlAst::Insert(insert) => Ok(Statement::Insert(type_insert(ctx, insert, tx)?)),
SqlAst::Delete(delete) => Ok(Statement::Delete(type_delete(ctx, delete, tx)?)),
SqlAst::Update(update) => Ok(Statement::Update(type_update(ctx, update, tx)?)),
SqlAst::Query(ast) => Ok(Statement::Select(SqlChecker::type_ast(ctx, ast, tx)?)),
SqlAst::Set(set) => Ok(Statement::Set(type_set(ctx, set)?)),
SqlAst::Show(show) => Ok(Statement::Show(type_show(show)?)),
}
}

/// Parse and type check a *general* query into a [StatementCtx].
pub fn compile_sql_stmt<'a>(sql: &'a str, tx: &impl SchemaView) -> TypingResult<StatementCtx<'a>> {
let statement = parse_and_type_sql(sql, tx)?;
pub fn compile_sql_stmt<'a>(ctx: &mut TyCtx, sql: &'a str, tx: &impl SchemaView) -> TypingResult<StatementCtx<'a>> {
let statement = parse_and_type_sql(ctx, sql, tx)?;
Ok(StatementCtx {
statement,
sql,
Expand Down
29 changes: 23 additions & 6 deletions crates/physical-plan/src/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,19 @@ fn compile_cross_joins(ctx: &TyCtx, joins: Vec<RelExpr>) -> PhysicalPlan {
.unwrap()
}

fn compile_dedup(ctx: &TyCtx, query: RelExpr) -> PhysicalPlan {
PhysicalPlan::Dedup(Box::new(compile_rel_expr(ctx, query)))
}

fn compile_rel_expr(ctx: &TyCtx, ast: RelExpr) -> PhysicalPlan {
match ast {
RelExpr::RelVar(table, _ty) => PhysicalPlan::TableScan(table),
RelExpr::Select(select) => compile_filter(ctx, *select),
RelExpr::Proj(proj) => compile_project(ctx, *proj),
RelExpr::Dedup(query) => compile_dedup(ctx, *query),
RelExpr::Join(joins, _) => compile_cross_joins(ctx, joins.into_vec()),
RelExpr::Union(_, _) | RelExpr::Minus(_, _) | RelExpr::Dedup(_) => {
unreachable!("DISTINCT is not implemented")
RelExpr::Union(_, _) | RelExpr::Minus(_, _) => {
unreachable!("Union|Minus is not implemented")
}
}
}
Expand Down Expand Up @@ -147,10 +152,11 @@ mod tests {
Ok((expr, ctx))
}

fn compile_sql_stmt_test(sql: &str) -> ResultTest<StatementCtx> {
fn compile_sql_stmt_test(sql: &str) -> ResultTest<(StatementCtx, TyCtx)> {
let tx = SchemaViewer(module_def());
let statement = compile_sql_stmt(sql, &tx)?;
Ok(statement)
let mut ctx = TyCtx::default();
let statement = compile_sql_stmt(&mut ctx, sql, &tx)?;
Ok((statement, ctx))
}

impl PhysicalPlan {
Expand Down Expand Up @@ -184,7 +190,7 @@ mod tests {
let (ast, ctx) = compile_sql_sub_test("SELECT * FROM t")?;
assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::TableScan(_)));

let ast = compile_sql_stmt_test("SELECT u32 FROM t")?;
let (ast, ctx) = compile_sql_stmt_test("SELECT u32 FROM t")?;
assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Project(..)));

Ok(())
Expand Down Expand Up @@ -237,4 +243,15 @@ mod tests {

Ok(())
}

#[test]
fn test_distinct() -> ResultTest<()> {
let (ast, ctx) = compile_sql_stmt_test("SELECT DISTINCT * FROM t")?;

let ast = compile(&ctx, ast).plan;

assert!(matches!(ast, PhysicalPlan::Dedup(_)));

Ok(())
}
}
36 changes: 2 additions & 34 deletions crates/physical-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use spacetimedb_lib::AlgebraicValue;
use spacetimedb_primitives::{ColId, IndexId};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_sql_parser::ast::BinOp;
use std::{ops::Bound, sync::Arc};

/// A physical plan is a concrete evaluation strategy.
/// As such, we can reason about its energy consumption.
Expand Down Expand Up @@ -34,32 +33,8 @@ pub enum PhysicalPlan {
Filter(Box<PhysicalPlan>, PhysicalExpr),
/// A tuple-at-a-time projection
Project(Box<PhysicalPlan>, PhysicalExpr),
}

impl PhysicalPlan {
pub fn as_project(&self) -> Option<&Project> {
if let PhysicalPlan::Project(p) = self {
Some(p)
} else {
None
}
}

pub fn as_filter(&self) -> Option<&Filter> {
if let PhysicalPlan::Filter(p) = self {
Some(p)
} else {
None
}
}

pub fn as_cross(&self) -> Option<&CrossJoin> {
if let PhysicalPlan::CrossJoin(p) = self {
Some(p)
} else {
None
}
}
/// Deduplicate a relation
Dedup(Box<PhysicalPlan>),
}

/// Fetch and return row ids from a btree index
Expand Down Expand Up @@ -169,10 +144,3 @@ pub struct PhysicalCtx<'a> {
pub sql: &'a str,
pub source: StatementSource,
}

/// A physical context for the result of a query compilation.
pub struct PhysicalCtx<'a> {
pub plan: PhysicalPlan,
pub sql: &'a str,
pub source: StatementSource,
}

0 comments on commit 088ceb4

Please sign in to comment.