Skip to content

Commit

Permalink
Refactor deserialization to reduce code duplication and fix gaps in a…
Browse files Browse the repository at this point in the history
…dditional field deser
  • Loading branch information
mwylde committed Dec 31, 2024
1 parent 455ac13 commit c6448c2
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 351 deletions.
4 changes: 1 addition & 3 deletions crates/arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use anyhow::{anyhow, bail, Context};
use arrow::array::{ArrayRef, RecordBatch};
use arroyo_operator::connector::ErasedConnector;
use arroyo_rpc::api_types::connections::{
ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType, TestSourceMessage,
};
use arroyo_rpc::primitive_to_sql;
use arroyo_rpc::var_str::VarStr;
use arroyo_types::{string_to_map, SourceError};
use async_trait::async_trait;
use arroyo_types::string_to_map;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Client;
use serde::{Deserialize, Serialize};
Expand Down
3 changes: 2 additions & 1 deletion crates/arroyo-connectors/src/redis/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl LookupConnector for RedisLookup {
for v in vs {
match v {
Value::Nil => {
self.deserializer.deserialize_slice("null".as_bytes(), SystemTime::now(), None)
self.deserializer
.deserialize_slice("null".as_bytes(), SystemTime::now(), None)
.await;
}
Value::SimpleString(s) => {
Expand Down
26 changes: 5 additions & 21 deletions crates/arroyo-formats/src/avro/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
fn deserializer_with_schema(
format: AvroFormat,
writer_schema: Option<&str>,
) -> (ArrowDeserializer, Vec<Box<dyn ArrayBuilder>>, ArroyoSchema) {
) -> (ArrowDeserializer, ArroyoSchema) {
let arrow_schema = if format.into_unstructured_json {
Schema::new(vec![Field::new("value", DataType::Utf8, false)])
} else {
Expand All @@ -239,13 +239,6 @@ mod tests {
ArroyoSchema::from_schema_keys(Arc::new(Schema::new(fields)), vec![]).unwrap()
};

let builders: Vec<_> = arroyo_schema
.schema
.fields
.iter()
.map(|f| make_builder(f.data_type(), 8))
.collect();

let resolver: Arc<dyn SchemaResolver + Sync> = if let Some(schema) = &writer_schema {
Arc::new(FixedSchemaResolver::new(
if format.confluent_schema_registry {
Expand All @@ -263,11 +256,10 @@ mod tests {
ArrowDeserializer::with_schema_resolver(
Format::Avro(format),
None,
arroyo_schema.clone(),
Arc::new(arroyo_schema.clone()),
BadData::Fail {},
resolver,
),
builders,
arroyo_schema,
)
}
Expand All @@ -277,23 +269,15 @@ mod tests {
writer_schema: Option<&str>,
message: &[u8],
) -> Vec<serde_json::Map<String, serde_json::Value>> {
let (mut deserializer, mut builders, arroyo_schema) =
let (mut deserializer, arroyo_schema) =
deserializer_with_schema(format.clone(), writer_schema);

let errors = deserializer
.deserialize_slice(&mut builders, message, SystemTime::now(), None)
.deserialize_slice(message, SystemTime::now(), None)
.await;
assert_eq!(errors, vec![]);

let batch = if format.into_unstructured_json {
RecordBatch::try_new(
arroyo_schema.schema,
builders.into_iter().map(|mut b| b.finish()).collect(),
)
.unwrap()
} else {
deserializer.flush_buffer().unwrap().unwrap()
};
let batch = deserializer.flush_buffer().unwrap().unwrap();

record_batch_to_vec(&batch, true, arrow_json::writer::TimestampFormat::RFC3339)
.unwrap()
Expand Down
Loading

0 comments on commit c6448c2

Please sign in to comment.