Skip to content

Commit

Permalink
WIP on lookup joins
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 16, 2024
1 parent 3c5e0d9 commit 445b1dc
Show file tree
Hide file tree
Showing 20 changed files with 547 additions and 132 deletions.
12 changes: 12 additions & 0 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ async fn expand_avro_schema(
ConnectionType::Sink => {
// don't fetch schemas for sinks for now
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
}
}

Expand All @@ -518,6 +521,9 @@ async fn expand_avro_schema(
schema.inferred = Some(true);
Ok(schema)
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
};
};

Expand Down Expand Up @@ -593,6 +599,9 @@ async fn expand_proto_schema(
ConnectionType::Sink => {
// don't fetch schemas for sinks for now
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
}
}

Expand Down Expand Up @@ -694,6 +703,9 @@ async fn expand_json_schema(
// don't fetch schemas for sinks for now until we're better able to conform our output to the schema
schema.inferred = Some(true);
}
ConnectionType::Lookup => {
todo!("lookup tables cannot be created via the UI")
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use arrow::array::{ArrayRef, RecordBatch};
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;
use tracing::warn;

Expand Down Expand Up @@ -64,6 +66,14 @@ pub fn connectors() -> HashMap<&'static str, Box<dyn ErasedConnector>> {
#[derive(Serialize, Deserialize)]
pub struct EmptyConfig {}

#[async_trait]
pub trait LookupConnector {
fn name(&self) -> String;

async fn lookup(&mut self, keys: &[ArrayRef]) -> RecordBatch;
}


pub(crate) async fn send(tx: &mut Sender<TestSourceMessage>, msg: TestSourceMessage) {
if tx.send(msg).await.is_err() {
warn!("Test API rx closed while sending message");
Expand Down
75 changes: 75 additions & 0 deletions crates/arroyo-connectors/src/redis/lookup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use arrow::array::{ArrayRef, AsArray, RecordBatch};
use arrow::compute::StringArrayType;
use arrow::datatypes::DataType;
use async_trait::async_trait;
use futures::future::OptionFuture;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use redis::{AsyncCommands, RedisFuture, RedisResult};
use arroyo_formats::de::ArrowDeserializer;
use crate::LookupConnector;
use crate::redis::{RedisClient, RedisConnector};
use crate::redis::sink::GeneralConnection;

pub struct RedisLookup {
deserializer: ArrowDeserializer,
client: RedisClient,
connection: Option<GeneralConnection>,
}

// pub enum RedisFutureOrNull<'a> {
// RedisFuture(RedisFuture<'a, String>),
// Null
// }
//
// impl <'a> Future for RedisFutureOrNull<'a> {
// type Output = RedisResult<Option<String>>;
//
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// match self {
// RedisFutureOrNull::RedisFuture(f) => (**f).poll().map(|t| Some(t)),
// RedisFutureOrNull::Null => Poll::Ready(None),
// }
// }
// }

#[async_trait]
impl LookupConnector for RedisLookup {
fn name(&self) -> String {
"RedisLookup".to_string()
}

async fn lookup(&mut self, keys: &[ArrayRef]) -> RecordBatch {
if self.connection.is_none() {
self.connection = Some(self.client.get_connection().await.unwrap());
}

assert_eq!(keys.len(), 1, "redis lookup can only have a single key");
assert_eq!(*keys[0].data_type(), DataType::Utf8, "redis lookup key must be a string");

let key = keys[0].as_string();

let connection = self.connection.as_mut().unwrap();

let result = connection.mget::<_, Vec<String>>(&key.iter().filter_map(|k| k).collect::<Vec<_>>())
.await
.unwrap();

let mut result_iter = result.iter();

for k in key.iter() {
if k.is_some() {
self.deserializer.deserialize_slice()result_iter.next()
}
}

while let Some(t) = futures.next().await {

};

Ok(())
}
}
161 changes: 94 additions & 67 deletions crates/arroyo-connectors/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod operator;
pub mod sink;
pub mod lookup;

use anyhow::{anyhow, bail};
use arroyo_formats::ser::ArrowSerializer;
Expand All @@ -19,8 +20,8 @@ use arroyo_rpc::api_types::connections::{
};
use arroyo_rpc::OperatorConfig;

use crate::redis::operator::sink::{GeneralConnection, RedisSinkFunc};
use crate::{pull_opt, pull_option_to_u64};
use crate::redis::sink::{GeneralConnection, RedisSinkFunc};

pub struct RedisConnector {}

Expand Down Expand Up @@ -289,52 +290,61 @@ impl Connector for RedisConnector {
}

let sink = match typ.as_str() {
"sink" => TableType::Target(match pull_opt("target", options)?.as_str() {
"string" => Target::StringTable {
key_prefix: pull_opt("target.key_prefix", options)?,
key_column: options
.remove("target.key_column")
.map(|name| validate_column(schema, name, "target.key_column"))
.transpose()?,
ttl_secs: pull_option_to_u64("target.ttl_secs", options)?
.map(|t| t.try_into())
.transpose()
.map_err(|_| anyhow!("target.ttl_secs must be greater than 0"))?,
},
"list" => Target::ListTable {
list_prefix: pull_opt("target.key_prefix", options)?,
list_key_column: options
.remove("target.key_column")
.map(|name| validate_column(schema, name, "target.key_column"))
.transpose()?,
max_length: pull_option_to_u64("target.max_length", options)?
.map(|t| t.try_into())
.transpose()
.map_err(|_| anyhow!("target.max_length must be greater than 0"))?,
operation: match options.remove("target.operation").as_deref() {
Some("append") | None => ListOperation::Append,
Some("prepend") => ListOperation::Prepend,
Some(op) => {
bail!("'{}' is not a valid value for target.operation; must be one of 'append' or 'prepend'", op);
}
},
},
"hash" => Target::HashTable {
hash_field_column: validate_column(
schema,
pull_opt("target.field_column", options)?,
"targets.field_column",
)?,
hash_key_column: options
.remove("target.key_column")
.map(|name| validate_column(schema, name, "target.key_column"))
.transpose()?,
hash_key_prefix: pull_opt("target.key_prefix", options)?,
},
s => {
bail!("'{}' is not a valid redis target", s);
"lookup" => {
TableType::Lookup {
lookup: Default::default(),
}
}),
}
"sink" => {
let target = match pull_opt("target", options)?.as_str() {
"string" => Target::StringTable {
key_prefix: pull_opt("target.key_prefix", options)?,
key_column: options
.remove("target.key_column")
.map(|name| validate_column(schema, name, "target.key_column"))
.transpose()?,
ttl_secs: pull_option_to_u64("target.ttl_secs", options)?
.map(|t| t.try_into())
.transpose()
.map_err(|_| anyhow!("target.ttl_secs must be greater than 0"))?,
},
"list" => Target::ListTable {
list_prefix: pull_opt("target.key_prefix", options)?,
list_key_column: options
.remove("target.key_column")
.map(|name| validate_column(schema, name, "target.key_column"))
.transpose()?,
max_length: pull_option_to_u64("target.max_length", options)?
.map(|t| t.try_into())
.transpose()
.map_err(|_| anyhow!("target.max_length must be greater than 0"))?,
operation: match options.remove("target.operation").as_deref() {
Some("append") | None => ListOperation::Append,
Some("prepend") => ListOperation::Prepend,
Some(op) => {
bail!("'{}' is not a valid value for target.operation; must be one of 'append' or 'prepend'", op);
}
},
},
"hash" => Target::HashTable {
hash_field_column: validate_column(
schema,
pull_opt("target.field_column", options)?,
"targets.field_column",
)?,
hash_key_column: options
.remove("target.key_column")
.map(|name| validate_column(schema, name, "target.key_column"))
.transpose()?,
hash_key_prefix: pull_opt("target.key_prefix", options)?,
},
s => {
bail!("'{}' is not a valid redis target", s);
}
};

TableType::Sink { target }
},
s => {
bail!("'{}' is not a valid type; must be `sink`", s);
}
Expand Down Expand Up @@ -371,6 +381,15 @@ impl Connector for RedisConnector {

let _ = RedisClient::new(&config)?;

let (connection_type, description) = match &table.connector_type {
TableType::Sink { .. } => {
(ConnectionType::Sink, "RedisSink")
}
TableType::Lookup { .. } => {
(ConnectionType::Lookup, "RedisLookup")
}
};

let config = OperatorConfig {
connection: serde_json::to_value(config).unwrap(),
table: serde_json::to_value(table).unwrap(),
Expand All @@ -380,15 +399,15 @@ impl Connector for RedisConnector {
framing: schema.framing.clone(),
metadata_fields: vec![],
};

Ok(Connection {
id,
connector: self.name(),
name: name.to_string(),
connection_type: ConnectionType::Sink,
connection_type,
schema,
config: serde_json::to_string(&config).unwrap(),
description: "RedisSink".to_string(),
description: description.to_string(),
})
}

Expand All @@ -400,22 +419,30 @@ impl Connector for RedisConnector {
) -> anyhow::Result<ConstructedOperator> {
let client = RedisClient::new(&profile)?;

let (tx, cmd_rx) = tokio::sync::mpsc::channel(128);
let (cmd_tx, rx) = tokio::sync::mpsc::channel(128);

Ok(ConstructedOperator::from_operator(Box::new(
RedisSinkFunc {
serializer: ArrowSerializer::new(
config.format.expect("redis table must have a format"),
),
table,
client,
cmd_q: Some((cmd_tx, cmd_rx)),
tx,
rx,
key_index: None,
hash_index: None,
},
)))
match table.connector_type {
TableType::Sink { target } => {
let (tx, cmd_rx) = tokio::sync::mpsc::channel(128);
let (cmd_tx, rx) = tokio::sync::mpsc::channel(128);

Ok(ConstructedOperator::from_operator(Box::new(
RedisSinkFunc {
serializer: ArrowSerializer::new(
config.format.expect("redis table must have a format"),
),
target,
client,
cmd_q: Some((cmd_tx, cmd_rx)),
tx,
rx,
key_index: None,
hash_index: None,
},
)))

}
TableType::Lookup { .. } => {
todo!()
}
}
}
}
1 change: 0 additions & 1 deletion crates/arroyo-connectors/src/redis/operator/mod.rs

This file was deleted.

Loading

0 comments on commit 445b1dc

Please sign in to comment.