Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Support for Multi-Level Partition Tables #88

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ testcontainers = "0.16.7"
testcontainers-modules = { version = "0.4.2", features = ["localstack"] }
time = { version = "0.3.34", features = ["serde"] }
geojson = "0.24.1"
tracing = "0.1"
rand = { version = "0.8.5" }
approx = "0.5.1"

[[bin]]
name = "pgrx_embed_pg_analytics"
Expand Down
38 changes: 36 additions & 2 deletions tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ pub mod arrow;
pub mod db;
pub mod tables;

use anyhow::Result;
use anyhow::{Context, Result};
use async_std::task::block_on;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
use bytes::Bytes;
use chrono::{DateTime, Duration};
use datafusion::arrow::array::*;
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit::Millisecond;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
arrow::{datatypes::FieldRef, record_batch::RecordBatch},
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
parquet::arrow::ArrowWriter,
};
use futures::future::{BoxFuture, FutureExt};
Expand Down Expand Up @@ -141,6 +143,38 @@ impl S3 {
Ok(())
}

#[allow(unused)]
pub async fn get_batch(&self, bucket: &str, key: &str) -> Result<RecordBatch> {
// Retrieve the object from S3
let get_object_output = self
.client
.get_object()
.bucket(bucket)
.key(key)
.send()
.await
.context("Failed to get object from S3")?;

// Read the body of the object
let body = get_object_output.body.collect().await?;
let bytes: Bytes = body.into_bytes();

// Create a Parquet reader
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
.context("Failed to create Parquet reader builder")?;

// Create the reader
let mut reader = builder.build().context("Failed to build Parquet reader")?;

// Read the first batch
let record_batch = reader
.next()
.context("No batches found in Parquet file")?
.context("Failed to read batch")?;

Ok(record_batch)
}

#[allow(unused)]
pub async fn put_rows<T: Serialize>(&self, bucket: &str, key: &str, rows: &[T]) -> Result<()> {
let fields = Vec::<FieldRef>::from_type::<NycTripsTable>(TracingOptions::default())?;
Expand Down
Loading
Loading