From 445b1dc9e35ed42d56e8e7d4e27e6a74b97f048e Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Sun, 15 Dec 2024 20:31:42 -0800 Subject: [PATCH] WIP on lookup joins --- crates/arroyo-api/src/connection_tables.rs | 12 ++ crates/arroyo-connectors/src/lib.rs | 10 ++ crates/arroyo-connectors/src/redis/lookup.rs | 75 ++++++++ crates/arroyo-connectors/src/redis/mod.rs | 161 ++++++++++-------- .../src/redis/operator/mod.rs | 1 - .../src/redis/{operator => }/sink.rs | 92 +++++----- crates/arroyo-connectors/src/redis/table.json | 9 + crates/arroyo-datastream/src/logical.rs | 2 + crates/arroyo-planner/src/extension/lookup.rs | 121 +++++++++++++ crates/arroyo-planner/src/extension/mod.rs | 2 + crates/arroyo-planner/src/extension/sink.rs | 1 + crates/arroyo-planner/src/lib.rs | 5 +- crates/arroyo-planner/src/plan/join.rs | 89 +++++++++- crates/arroyo-planner/src/rewriters.rs | 15 ++ crates/arroyo-planner/src/tables.rs | 42 +++-- crates/arroyo-rpc/proto/api.proto | 7 + .../arroyo-rpc/src/api_types/connections.rs | 2 + crates/arroyo-worker/src/arrow/lookup_join.rs | 31 ++++ crates/arroyo-worker/src/arrow/mod.rs | 1 + crates/arroyo-worker/src/engine.rs | 1 + 20 files changed, 547 insertions(+), 132 deletions(-) create mode 100644 crates/arroyo-connectors/src/redis/lookup.rs delete mode 100644 crates/arroyo-connectors/src/redis/operator/mod.rs rename crates/arroyo-connectors/src/redis/{operator => }/sink.rs (84%) create mode 100644 crates/arroyo-planner/src/extension/lookup.rs create mode 100644 crates/arroyo-worker/src/arrow/lookup_join.rs diff --git a/crates/arroyo-api/src/connection_tables.rs b/crates/arroyo-api/src/connection_tables.rs index df110b893..904ededf6 100644 --- a/crates/arroyo-api/src/connection_tables.rs +++ b/crates/arroyo-api/src/connection_tables.rs @@ -506,6 +506,9 @@ async fn expand_avro_schema( ConnectionType::Sink => { // don't fetch schemas for sinks for now } + ConnectionType::Lookup => { + todo!("lookup tables cannot be created via the UI") + } } } @@ -518,6 +521,9 @@ async fn expand_avro_schema( schema.inferred = Some(true); Ok(schema) } + ConnectionType::Lookup => { + todo!("lookup tables cannot be created via the UI") + } }; }; @@ -593,6 +599,9 @@ async fn expand_proto_schema( ConnectionType::Sink => { // don't fetch schemas for sinks for now } + ConnectionType::Lookup => { + todo!("lookup tables cannot be created via the UI") + } } } @@ -694,6 +703,9 @@ async fn expand_json_schema( // don't fetch schemas for sinks for now until we're better able to conform our output to the schema schema.inferred = Some(true); } + ConnectionType::Lookup => { + todo!("lookup tables cannot be created via the UI") + } } } diff --git a/crates/arroyo-connectors/src/lib.rs b/crates/arroyo-connectors/src/lib.rs index bc4892269..087342c9d 100644 --- a/crates/arroyo-connectors/src/lib.rs +++ b/crates/arroyo-connectors/src/lib.rs @@ -11,6 +11,8 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::time::Duration; +use arrow::array::{ArrayRef, RecordBatch}; +use async_trait::async_trait; use tokio::sync::mpsc::Sender; use tracing::warn; @@ -64,6 +66,14 @@ pub fn connectors() -> HashMap<&'static str, Box> { #[derive(Serialize, Deserialize)] pub struct EmptyConfig {} +#[async_trait] +pub trait LookupConnector { + fn name(&self) -> String; + + async fn lookup(&mut self, keys: &[ArrayRef]) -> RecordBatch; +} + + pub(crate) async fn send(tx: &mut Sender, msg: TestSourceMessage) { if tx.send(msg).await.is_err() { warn!("Test API rx closed while sending message"); diff --git a/crates/arroyo-connectors/src/redis/lookup.rs b/crates/arroyo-connectors/src/redis/lookup.rs new file mode 100644 index 000000000..2af114259 --- /dev/null +++ b/crates/arroyo-connectors/src/redis/lookup.rs @@ -0,0 +1,75 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use arrow::array::{ArrayRef, AsArray, RecordBatch}; +use arrow::compute::StringArrayType; +use arrow::datatypes::DataType; +use async_trait::async_trait; +use futures::future::OptionFuture; +use futures::stream::FuturesOrdered; +use futures::StreamExt; +use redis::{AsyncCommands, RedisFuture, RedisResult}; +use arroyo_formats::de::ArrowDeserializer; +use crate::LookupConnector; +use crate::redis::{RedisClient, RedisConnector}; +use crate::redis::sink::GeneralConnection; + +pub struct RedisLookup { + deserializer: ArrowDeserializer, + client: RedisClient, + connection: Option, +} + +// pub enum RedisFutureOrNull<'a> { +// RedisFuture(RedisFuture<'a, String>), +// Null +// } +// +// impl <'a> Future for RedisFutureOrNull<'a> { +// type Output = RedisResult>; +// +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// match self { +// RedisFutureOrNull::RedisFuture(f) => (**f).poll().map(|t| Some(t)), +// RedisFutureOrNull::Null => Poll::Ready(None), +// } +// } +// } + +#[async_trait] +impl LookupConnector for RedisLookup { + fn name(&self) -> String { + "RedisLookup".to_string() + } + + async fn lookup(&mut self, keys: &[ArrayRef]) -> RecordBatch { + if self.connection.is_none() { + self.connection = Some(self.client.get_connection().await.unwrap()); + } + + assert_eq!(keys.len(), 1, "redis lookup can only have a single key"); + assert_eq!(*keys[0].data_type(), DataType::Utf8, "redis lookup key must be a string"); + + let key = keys[0].as_string(); + + let connection = self.connection.as_mut().unwrap(); + + let result = connection.mget::<_, Vec>(&key.iter().filter_map(|k| k).collect::>()) + .await + .unwrap(); + + let mut result_iter = result.iter(); + + for k in key.iter() { + if k.is_some() { + self.deserializer.deserialize_slice()result_iter.next() + } + } + + while let Some(t) = futures.next().await { + + }; + + Ok(()) + } +} \ No newline at end of file diff --git a/crates/arroyo-connectors/src/redis/mod.rs b/crates/arroyo-connectors/src/redis/mod.rs index 7a78e88d0..8a678bf98 100644 --- a/crates/arroyo-connectors/src/redis/mod.rs +++ b/crates/arroyo-connectors/src/redis/mod.rs @@ -1,4 +1,5 @@ -mod operator; +pub mod sink; +pub mod lookup; use anyhow::{anyhow, bail}; use arroyo_formats::ser::ArrowSerializer; @@ -19,8 +20,8 @@ use arroyo_rpc::api_types::connections::{ }; use arroyo_rpc::OperatorConfig; -use crate::redis::operator::sink::{GeneralConnection, RedisSinkFunc}; use crate::{pull_opt, pull_option_to_u64}; +use crate::redis::sink::{GeneralConnection, RedisSinkFunc}; pub struct RedisConnector {} @@ -289,52 +290,61 @@ impl Connector for RedisConnector { } let sink = match typ.as_str() { - "sink" => TableType::Target(match pull_opt("target", options)?.as_str() { - "string" => Target::StringTable { - key_prefix: pull_opt("target.key_prefix", options)?, - key_column: options - .remove("target.key_column") - .map(|name| validate_column(schema, name, "target.key_column")) - .transpose()?, - ttl_secs: pull_option_to_u64("target.ttl_secs", options)? - .map(|t| t.try_into()) - .transpose() - .map_err(|_| anyhow!("target.ttl_secs must be greater than 0"))?, - }, - "list" => Target::ListTable { - list_prefix: pull_opt("target.key_prefix", options)?, - list_key_column: options - .remove("target.key_column") - .map(|name| validate_column(schema, name, "target.key_column")) - .transpose()?, - max_length: pull_option_to_u64("target.max_length", options)? - .map(|t| t.try_into()) - .transpose() - .map_err(|_| anyhow!("target.max_length must be greater than 0"))?, - operation: match options.remove("target.operation").as_deref() { - Some("append") | None => ListOperation::Append, - Some("prepend") => ListOperation::Prepend, - Some(op) => { - bail!("'{}' is not a valid value for target.operation; must be one of 'append' or 'prepend'", op); - } - }, - }, - "hash" => Target::HashTable { - hash_field_column: validate_column( - schema, - pull_opt("target.field_column", options)?, - "targets.field_column", - )?, - hash_key_column: options - .remove("target.key_column") - .map(|name| validate_column(schema, name, "target.key_column")) - .transpose()?, - hash_key_prefix: pull_opt("target.key_prefix", options)?, - }, - s => { - bail!("'{}' is not a valid redis target", s); + "lookup" => { + TableType::Lookup { + lookup: Default::default(), } - }), + } + "sink" => { + let target = match pull_opt("target", options)?.as_str() { + "string" => Target::StringTable { + key_prefix: pull_opt("target.key_prefix", options)?, + key_column: options + .remove("target.key_column") + .map(|name| validate_column(schema, name, "target.key_column")) + .transpose()?, + ttl_secs: pull_option_to_u64("target.ttl_secs", options)? + .map(|t| t.try_into()) + .transpose() + .map_err(|_| anyhow!("target.ttl_secs must be greater than 0"))?, + }, + "list" => Target::ListTable { + list_prefix: pull_opt("target.key_prefix", options)?, + list_key_column: options + .remove("target.key_column") + .map(|name| validate_column(schema, name, "target.key_column")) + .transpose()?, + max_length: pull_option_to_u64("target.max_length", options)? + .map(|t| t.try_into()) + .transpose() + .map_err(|_| anyhow!("target.max_length must be greater than 0"))?, + operation: match options.remove("target.operation").as_deref() { + Some("append") | None => ListOperation::Append, + Some("prepend") => ListOperation::Prepend, + Some(op) => { + bail!("'{}' is not a valid value for target.operation; must be one of 'append' or 'prepend'", op); + } + }, + }, + "hash" => Target::HashTable { + hash_field_column: validate_column( + schema, + pull_opt("target.field_column", options)?, + "targets.field_column", + )?, + hash_key_column: options + .remove("target.key_column") + .map(|name| validate_column(schema, name, "target.key_column")) + .transpose()?, + hash_key_prefix: pull_opt("target.key_prefix", options)?, + }, + s => { + bail!("'{}' is not a valid redis target", s); + } + }; + + TableType::Sink { target } + }, s => { bail!("'{}' is not a valid type; must be `sink`", s); } @@ -371,6 +381,15 @@ impl Connector for RedisConnector { let _ = RedisClient::new(&config)?; + let (connection_type, description) = match &table.connector_type { + TableType::Sink { .. } => { + (ConnectionType::Sink, "RedisSink") + } + TableType::Lookup { .. } => { + (ConnectionType::Lookup, "RedisLookup") + } + }; + let config = OperatorConfig { connection: serde_json::to_value(config).unwrap(), table: serde_json::to_value(table).unwrap(), @@ -380,15 +399,15 @@ impl Connector for RedisConnector { framing: schema.framing.clone(), metadata_fields: vec![], }; - + Ok(Connection { id, connector: self.name(), name: name.to_string(), - connection_type: ConnectionType::Sink, + connection_type, schema, config: serde_json::to_string(&config).unwrap(), - description: "RedisSink".to_string(), + description: description.to_string(), }) } @@ -400,22 +419,30 @@ impl Connector for RedisConnector { ) -> anyhow::Result { let client = RedisClient::new(&profile)?; - let (tx, cmd_rx) = tokio::sync::mpsc::channel(128); - let (cmd_tx, rx) = tokio::sync::mpsc::channel(128); - - Ok(ConstructedOperator::from_operator(Box::new( - RedisSinkFunc { - serializer: ArrowSerializer::new( - config.format.expect("redis table must have a format"), - ), - table, - client, - cmd_q: Some((cmd_tx, cmd_rx)), - tx, - rx, - key_index: None, - hash_index: None, - }, - ))) + match table.connector_type { + TableType::Sink { target } => { + let (tx, cmd_rx) = tokio::sync::mpsc::channel(128); + let (cmd_tx, rx) = tokio::sync::mpsc::channel(128); + + Ok(ConstructedOperator::from_operator(Box::new( + RedisSinkFunc { + serializer: ArrowSerializer::new( + config.format.expect("redis table must have a format"), + ), + target, + client, + cmd_q: Some((cmd_tx, cmd_rx)), + tx, + rx, + key_index: None, + hash_index: None, + }, + ))) + + } + TableType::Lookup { .. } => { + todo!() + } + } } } diff --git a/crates/arroyo-connectors/src/redis/operator/mod.rs b/crates/arroyo-connectors/src/redis/operator/mod.rs deleted file mode 100644 index 0ecbfb920..000000000 --- a/crates/arroyo-connectors/src/redis/operator/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod sink; diff --git a/crates/arroyo-connectors/src/redis/operator/sink.rs b/crates/arroyo-connectors/src/redis/sink.rs similarity index 84% rename from crates/arroyo-connectors/src/redis/operator/sink.rs rename to crates/arroyo-connectors/src/redis/sink.rs index 93d9e76ba..b40f8c99a 100644 --- a/crates/arroyo-connectors/src/redis/operator/sink.rs +++ b/crates/arroyo-connectors/src/redis/sink.rs @@ -19,7 +19,7 @@ const FLUSH_BYTES: usize = 10 * 1024 * 1024; pub struct RedisSinkFunc { pub serializer: ArrowSerializer, - pub table: RedisTable, + pub target: Target, pub client: RedisClient, pub cmd_q: Option<(Sender, Receiver)>, @@ -229,19 +229,19 @@ impl ArrowOperator for RedisSinkFunc { } async fn on_start(&mut self, ctx: &mut OperatorContext) { - match &self.table.connector_type { - TableType::Target(Target::ListTable { + match &self.target { + Target::ListTable { list_key_column: Some(key), .. - }) - | TableType::Target(Target::StringTable { + } + | Target::StringTable { key_column: Some(key), .. - }) - | TableType::Target(Target::HashTable { + } + | Target::HashTable { hash_key_column: Some(key), .. - }) => { + } => { self.key_index = Some( ctx.in_schemas .first() @@ -258,9 +258,9 @@ impl ArrowOperator for RedisSinkFunc { _ => {} } - if let TableType::Target(Target::HashTable { + if let Target::HashTable { hash_field_column, .. - }) = &self.table.connector_type + } = &self.target { self.hash_index = Some(ctx.in_schemas.first().expect("no in-schema for redis sink!") .schema @@ -282,17 +282,17 @@ impl ArrowOperator for RedisSinkFunc { size_estimate: 0, last_flushed: Instant::now(), max_push_keys: HashSet::new(), - behavior: match self.table.connector_type { - TableType::Target(Target::StringTable { ttl_secs, .. }) => { + behavior: match &self.target { + Target::StringTable { ttl_secs, .. } => { RedisBehavior::Set { ttl: ttl_secs.map(|t| t.get() as usize), } } - TableType::Target(Target::ListTable { + Target::ListTable { max_length, operation, .. - }) => { + } => { let max = max_length.map(|x| x.get() as usize); match operation { ListOperation::Append => { @@ -303,7 +303,7 @@ impl ArrowOperator for RedisSinkFunc { } } } - TableType::Target(Target::HashTable { .. }) => RedisBehavior::Hash, + Target::HashTable { .. } => RedisBehavior::Hash, }, } .start(); @@ -328,39 +328,37 @@ impl ArrowOperator for RedisSinkFunc { _: &mut dyn Collector, ) { for (i, value) in self.serializer.serialize(&batch).enumerate() { - match &self.table.connector_type { - TableType::Target(target) => match &target { - Target::StringTable { key_prefix, .. } => { - let key = self.make_key(key_prefix, &batch, i); - self.tx - .send(RedisCmd::Data { key, value }) - .await - .expect("Redis writer panicked"); - } - Target::ListTable { list_prefix, .. } => { - let key = self.make_key(list_prefix, &batch, i); + match &self.target { + Target::StringTable { key_prefix, .. } => { + let key = self.make_key(key_prefix, &batch, i); + self.tx + .send(RedisCmd::Data { key, value }) + .await + .expect("Redis writer panicked"); + } + Target::ListTable { list_prefix, .. } => { + let key = self.make_key(list_prefix, &batch, i); - self.tx - .send(RedisCmd::Data { key, value }) - .await - .expect("Redis writer panicked"); - } - Target::HashTable { - hash_key_prefix, .. - } => { - let key = self.make_key(hash_key_prefix, &batch, i); - let field = batch - .column(self.hash_index.expect("no hash index")) - .as_string::() - .value(i) - .to_string(); - - self.tx - .send(RedisCmd::HData { key, field, value }) - .await - .expect("Redis writer panicked"); - } - }, + self.tx + .send(RedisCmd::Data { key, value }) + .await + .expect("Redis writer panicked"); + } + Target::HashTable { + hash_key_prefix, .. + } => { + let key = self.make_key(hash_key_prefix, &batch, i); + let field = batch + .column(self.hash_index.expect("no hash index")) + .as_string::() + .value(i) + .to_string(); + + self.tx + .send(RedisCmd::HData { key, field, value }) + .await + .expect("Redis writer panicked"); + } }; } } diff --git a/crates/arroyo-connectors/src/redis/table.json b/crates/arroyo-connectors/src/redis/table.json index e1dd45234..be1be2212 100644 --- a/crates/arroyo-connectors/src/redis/table.json +++ b/crates/arroyo-connectors/src/redis/table.json @@ -114,6 +114,15 @@ "target" ], "additionalProperties": false + }, + { + "type": "object", + "title": "Lookup", + "properties": { + "lookup": { + "type": "object" + } + } } ] } diff --git a/crates/arroyo-datastream/src/logical.rs b/crates/arroyo-datastream/src/logical.rs index f4689c995..2f4f957a7 100644 --- a/crates/arroyo-datastream/src/logical.rs +++ b/crates/arroyo-datastream/src/logical.rs @@ -32,6 +32,7 @@ pub enum OperatorName { AsyncUdf, Join, InstantJoin, + LookupJoin, WindowFunction, TumblingWindowAggregate, SlidingWindowAggregate, @@ -376,6 +377,7 @@ impl LogicalProgram { OperatorName::Join => "join-with-expiration".to_string(), OperatorName::InstantJoin => "windowed-join".to_string(), OperatorName::WindowFunction => "sql-window-function".to_string(), + OperatorName::LookupJoin => "lookup-join".to_string(), OperatorName::TumblingWindowAggregate => { "sql-tumbling-window-aggregate".to_string() } diff --git a/crates/arroyo-planner/src/extension/lookup.rs b/crates/arroyo-planner/src/extension/lookup.rs new file mode 100644 index 000000000..1b6dbc19f --- /dev/null +++ b/crates/arroyo-planner/src/extension/lookup.rs @@ -0,0 +1,121 @@ +use std::fmt::Formatter; +use datafusion::common::{internal_err, DFSchemaRef}; +use datafusion::logical_expr::{Expr, Join, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::sql::TableReference; +use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef}; +use crate::builder::{NamedNode, Planner}; +use crate::extension::{ArroyoExtension, NodeWithIncomingEdges}; +use crate::multifield_partial_ord; +use crate::tables::ConnectorTable; + +pub const SOURCE_EXTENSION_NAME: &str = "LookupSource"; +pub const JOIN_EXTENSION_NAME: &str = "LookupJoin"; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LookupSource { + pub(crate) table: ConnectorTable, + pub(crate) schema: DFSchemaRef, +} + +multifield_partial_ord!(LookupSource, table); + +impl UserDefinedLogicalNodeCore for LookupSource { + fn name(&self) -> &str { + SOURCE_EXTENSION_NAME + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "LookupSource: {}", self.schema) + } + + fn with_exprs_and_inputs(&self, _exprs: Vec, inputs: Vec) -> datafusion::common::Result { + if !inputs.is_empty() { + return internal_err!("LookupSource cannot have inputs"); + } + + + Ok(Self { + table: self.table.clone(), + schema: self.schema.clone(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LookupJoin { + pub(crate) input: LogicalPlan, + pub(crate) schema: DFSchemaRef, + pub(crate) connector: ConnectorTable, + pub(crate) on: Vec<(Expr, Expr)>, + pub(crate) filter: Option, + pub(crate) alias: Option, +} + +multifield_partial_ord!(LookupJoin, input, connector, on, filter, alias); + +impl ArroyoExtension for LookupJoin { + fn node_name(&self) -> Option { + todo!() + } + + fn plan_node(&self, planner: &Planner, index: usize, input_schemas: Vec) -> datafusion::common::Result { + todo!() + } + + fn output_schema(&self) -> ArroyoSchema { + todo!() + } +} + +impl UserDefinedLogicalNodeCore for LookupJoin { + fn name(&self) -> &str { + JOIN_EXTENSION_NAME + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + let mut e: Vec<_> = self.on.iter() + .flat_map(|(l, r)| vec![l.clone(), r.clone()]) + .collect(); + + if let Some(filter) = &self.filter { + e.push(filter.clone()); + } + + e + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "LookupJoinExtension: {}", self.schema) + } + + fn with_exprs_and_inputs(&self, _: Vec, inputs: Vec) -> datafusion::common::Result { + Ok(Self { + input: inputs[0].clone(), + schema: self.schema.clone(), + connector: self.connector.clone(), + on: self.on.clone(), + filter: self.filter.clone(), + alias: self.alias.clone(), + }) + } +} \ No newline at end of file diff --git a/crates/arroyo-planner/src/extension/mod.rs b/crates/arroyo-planner/src/extension/mod.rs index 7b576b2a0..5b88f06bf 100644 --- a/crates/arroyo-planner/src/extension/mod.rs +++ b/crates/arroyo-planner/src/extension/mod.rs @@ -40,6 +40,8 @@ pub(crate) mod table_source; pub(crate) mod updating_aggregate; pub(crate) mod watermark_node; pub(crate) mod window_fn; +pub(crate) mod lookup; + pub(crate) trait ArroyoExtension: Debug { // if the extension has a name, return it so that we can memoize. fn node_name(&self) -> Option; diff --git a/crates/arroyo-planner/src/extension/sink.rs b/crates/arroyo-planner/src/extension/sink.rs index 0e559d175..5141e6e3a 100644 --- a/crates/arroyo-planner/src/extension/sink.rs +++ b/crates/arroyo-planner/src/extension/sink.rs @@ -61,6 +61,7 @@ impl SinkExtension { (false, false) => {} } } + Table::LookupTable(..) => return plan_err!("cannot use a lookup table as a sink"), Table::MemoryTable { .. } => return plan_err!("memory tables not supported"), Table::TableFromQuery { .. } => {} Table::PreviewSink { .. } => { diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index a9972e1af..7ead899ec 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -826,8 +826,11 @@ pub async fn parse_and_get_arrow_program( logical_plan.replace(plan_rewrite); continue; } + Table::LookupTable(_) => { + plan_err!("lookup (temporary) tables cannot be inserted into") + } Table::TableFromQuery { .. } => { - plan_err!("Shouldn't be inserting more data into a table made with CREATE TABLE AS") + plan_err!("shouldn't be inserting more data into a table made with CREATE TABLE AS") } Table::PreviewSink { .. } => { plan_err!("queries shouldn't be able insert into preview sink.") diff --git a/crates/arroyo-planner/src/plan/join.rs b/crates/arroyo-planner/src/plan/join.rs index 9dea42c56..ae5d3c2e3 100644 --- a/crates/arroyo-planner/src/plan/join.rs +++ b/crates/arroyo-planner/src/plan/join.rs @@ -4,7 +4,7 @@ use crate::plan::WindowDetectingVisitor; use crate::{fields_with_qualifiers, schema_from_df_fields_with_metadata, ArroyoSchemaProvider}; use arroyo_datastream::WindowType; use arroyo_rpc::UPDATING_META_FIELD; -use datafusion::common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor}; use datafusion::common::{ not_impl_err, plan_err, Column, DataFusionError, JoinConstraint, JoinType, Result, ScalarValue, TableReference, @@ -16,6 +16,8 @@ use datafusion::logical_expr::{ }; use datafusion::prelude::coalesce; use std::sync::Arc; +use crate::extension::lookup::{LookupJoin, LookupSource}; +use crate::tables::ConnectorTable; pub(crate) struct JoinRewriter<'a> { pub schema_provider: &'a ArroyoSchemaProvider, @@ -189,6 +191,86 @@ impl JoinRewriter<'_> { } } +#[derive(Default)] +struct FindLookupExtension { + table: Option, + filter: Option, + alias: Option, +} + +impl <'a> TreeNodeVisitor<'a> for FindLookupExtension { + type Node = LogicalPlan; + + fn f_down(&mut self, node: &Self::Node) -> Result { + match node { + LogicalPlan::Extension(e) => { + if let Some(s) = e.node.as_any().downcast_ref::() { + self.table = Some(s.table.clone()); + return Ok(TreeNodeRecursion::Stop); + } + } + LogicalPlan::Filter(filter) => { + if self.filter.replace(filter.predicate.clone()).is_some() { + return plan_err!("multiple filters found in lookup join, which is not supported"); + } + } + LogicalPlan::SubqueryAlias(s) => { + self.alias = Some(s.alias.clone()); + } + _ => { + return plan_err!("lookup tables must be used directly within a join"); + } + } + Ok(TreeNodeRecursion::Continue) + } +} + +fn has_lookup(plan: &LogicalPlan) -> Result { + plan.exists(|p| Ok(match p { + LogicalPlan::Extension(e) => e.node.as_any().is::(), + _ => false + })) +} + +fn maybe_plan_lookup_join(join: &Join) -> Result> { + if has_lookup(&join.left)? { + return plan_err!("lookup sources must be on the right side of an inner or left join"); + } + + if !has_lookup(&join.right)? { + return Ok(None); + } + + println!("JOin = {:?} {:?}\n{:#?}", join.join_constraint, join.join_type, join.on); + + match join.join_type { + JoinType::Inner | JoinType::Left => {} + t => { + return plan_err!("{} join is not supported for lookup tables; must be a left or inner join", t); + } + } + + if join.filter.is_some() { + return plan_err!("filter join conditions are not supported for lookup joins; must have an equality condition"); + } + + let mut lookup = FindLookupExtension::default(); + join.right.visit(&mut lookup)?; + + let connector = lookup.table.expect("right side of join does not have lookup"); + + Ok(Some(LogicalPlan::Extension(Extension { + node: Arc::new(LookupJoin { + input: (*join.left).clone(), + schema: join.schema.clone(), + connector, + on: join.on.clone(), + filter: lookup.filter, + alias: lookup.alias, + }) + }))) +} + impl TreeNodeRewriter for JoinRewriter<'_> { type Node = LogicalPlan; @@ -196,6 +278,11 @@ impl TreeNodeRewriter for JoinRewriter<'_> { let LogicalPlan::Join(join) = node else { return Ok(Transformed::no(node)); }; + + if let Some(plan) = maybe_plan_lookup_join(&join)? { + return Ok(Transformed::yes(plan)); + } + let is_instant = Self::check_join_windowing(&join)?; let Join { diff --git a/crates/arroyo-planner/src/rewriters.rs b/crates/arroyo-planner/src/rewriters.rs index 611822719..12cbfd21f 100644 --- a/crates/arroyo-planner/src/rewriters.rs +++ b/crates/arroyo-planner/src/rewriters.rs @@ -36,6 +36,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; +use crate::extension::lookup::LookupSource; /// Rewrites a logical plan to move projections out of table scans /// and into a separate projection node which may include virtual fields, @@ -215,6 +216,19 @@ impl SourceRewriter<'_> { }))) } + fn mutate_lookup_table( + &self, + table_scan: &TableScan, + table: &ConnectorTable + ) -> DFResult> { + Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(LookupSource { + table: table.clone(), + schema: table_scan.projected_schema.clone(), + }) + }))) + } + fn mutate_table_from_query( &self, table_scan: &TableScan, @@ -273,6 +287,7 @@ impl TreeNodeRewriter for SourceRewriter<'_> { match table { Table::ConnectorTable(table) => self.mutate_connector_table(&table_scan, table), + Table::LookupTable(table) => self.mutate_lookup_table(&table_scan, table), Table::MemoryTable { name, fields: _, diff --git a/crates/arroyo-planner/src/tables.rs b/crates/arroyo-planner/src/tables.rs index dd8f7141f..8bc133313 100644 --- a/crates/arroyo-planner/src/tables.rs +++ b/crates/arroyo-planner/src/tables.rs @@ -418,7 +418,7 @@ impl ConnectorTable { pub fn as_sql_source(&self) -> Result { match self.connection_type { ConnectionType::Source => {} - ConnectionType::Sink => { + ConnectionType::Sink | ConnectionType::Lookup => { return plan_err!("cannot read from sink"); } }; @@ -463,6 +463,7 @@ pub struct SourceOperator { #[allow(clippy::enum_variant_names)] #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Table { + LookupTable(ConnectorTable), ConnectorTable(ConnectorTable), MemoryTable { name: String, @@ -673,6 +674,7 @@ impl Table { columns, with_options, query: None, + temporary, .. }) = statement { @@ -750,17 +752,23 @@ impl Table { ), None => None, }; - Ok(Some(Table::ConnectorTable( - ConnectorTable::from_options( - &name, - connector, - fields, - primary_keys, - &mut with_map, - connection_profile, - ) - .map_err(|e| e.context(format!("Failed to create table {}", name)))?, - ))) + let table = ConnectorTable::from_options( + &name, + connector, + fields, + primary_keys, + &mut with_map, + connection_profile, + ).map_err(|e| e.context(format!("Failed to create table {}", name)))?; + + Ok(Some(match table.connection_type { + ConnectionType::Source | ConnectionType::Sink => { + Table::ConnectorTable(table) + } + ConnectionType::Lookup => { + Table::LookupTable(table) + } + })) } } } else { @@ -798,7 +806,7 @@ impl Table { pub fn name(&self) -> &str { match self { Table::MemoryTable { name, .. } | Table::TableFromQuery { name, .. } => name.as_str(), - Table::ConnectorTable(c) => c.name.as_str(), + Table::ConnectorTable(c) | Table::LookupTable(c) => c.name.as_str(), Table::PreviewSink { .. } => "preview", } } @@ -836,7 +844,11 @@ impl Table { fields, inferred_fields, .. - }) => inferred_fields + }) | Table::LookupTable(ConnectorTable { + fields, + inferred_fields, + .. + }) => inferred_fields .as_ref() .map(|fs| fs.iter().map(|f| f.field().clone()).collect()) .unwrap_or_else(|| { @@ -856,7 +868,7 @@ impl Table { pub fn connector_op(&self) -> Result { match self { - Table::ConnectorTable(c) => Ok(c.connector_op()), + Table::ConnectorTable(c) | Table::LookupTable(c) => Ok(c.connector_op()), Table::MemoryTable { .. } => plan_err!("can't write to a memory table"), Table::TableFromQuery { .. } => todo!(), Table::PreviewSink { logical_plan: _ } => Ok(default_sink()), diff --git a/crates/arroyo-rpc/proto/api.proto b/crates/arroyo-rpc/proto/api.proto index 24c120c11..09c8621f5 100644 --- a/crates/arroyo-rpc/proto/api.proto +++ b/crates/arroyo-rpc/proto/api.proto @@ -70,6 +70,13 @@ message JoinOperator { optional uint64 ttl_micros = 6; } +message LookupJoinOperator { + string name = 1; + ArroyoSchema schema = 2; + ConnectorOp connector = 3; + +} + message WindowFunctionOperator { string name = 1; ArroyoSchema input_schema = 2; diff --git a/crates/arroyo-rpc/src/api_types/connections.rs b/crates/arroyo-rpc/src/api_types/connections.rs index 93ca72d60..3ce2a8f72 100644 --- a/crates/arroyo-rpc/src/api_types/connections.rs +++ b/crates/arroyo-rpc/src/api_types/connections.rs @@ -51,6 +51,7 @@ pub struct ConnectionProfilePost { pub enum ConnectionType { Source, Sink, + Lookup, } impl Display for ConnectionType { @@ -58,6 +59,7 @@ impl Display for ConnectionType { match self { ConnectionType::Source => write!(f, "SOURCE"), ConnectionType::Sink => write!(f, "SINK"), + ConnectionType::Lookup => write!(f, "LOOKUP"), } } } diff --git a/crates/arroyo-worker/src/arrow/lookup_join.rs b/crates/arroyo-worker/src/arrow/lookup_join.rs new file mode 100644 index 000000000..c11927e22 --- /dev/null +++ b/crates/arroyo-worker/src/arrow/lookup_join.rs @@ -0,0 +1,31 @@ +use arrow_array::RecordBatch; +use async_trait::async_trait; +use datafusion::physical_expr::PhysicalExpr; +use arroyo_connectors::LookupConnector; +use arroyo_operator::context::{Collector, OperatorContext}; +use arroyo_operator::operator::ArrowOperator; + + +pub struct LookupJoin { + connector: Box, + key_exprs: Vec, +} + +#[async_trait] +impl ArrowOperator for LookupJoin { + fn name(&self) -> String { + format!("LookupJoin<{}>", self.connector.name()) + } + + async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut OperatorContext, collector: &mut dyn Collector) { + let keys = self.key_exprs.iter() + .map(|expr| expr.evaluate(&batch).unwrap().into_array().unwrap()) + .collect::>(); + + + + for i in 0..keys.num_rows() { + + } + } +} \ No newline at end of file diff --git a/crates/arroyo-worker/src/arrow/mod.rs b/crates/arroyo-worker/src/arrow/mod.rs index 70353877a..56ff81e7b 100644 --- a/crates/arroyo-worker/src/arrow/mod.rs +++ b/crates/arroyo-worker/src/arrow/mod.rs @@ -35,6 +35,7 @@ pub mod tumbling_aggregating_window; pub mod updating_aggregator; pub mod watermark_generator; pub mod window_fn; +mod lookup_join; pub struct ValueExecutionOperator { name: String, diff --git a/crates/arroyo-worker/src/engine.rs b/crates/arroyo-worker/src/engine.rs index 879bab37f..ab01a3240 100644 --- a/crates/arroyo-worker/src/engine.rs +++ b/crates/arroyo-worker/src/engine.rs @@ -874,6 +874,7 @@ pub fn construct_operator( OperatorName::ExpressionWatermark => Box::new(WatermarkGeneratorConstructor), OperatorName::Join => Box::new(JoinWithExpirationConstructor), OperatorName::InstantJoin => Box::new(InstantJoinConstructor), + OperatorName::LookupJoin => todo!(), OperatorName::WindowFunction => Box::new(WindowFunctionConstructor), OperatorName::ConnectorSource | OperatorName::ConnectorSink => { let op: api::ConnectorOp = prost::Message::decode(config).unwrap();