Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust bindings for CDF reads #612

Merged

Conversation

PatrickJin-db
Copy link
Collaborator

@PatrickJin-db PatrickJin-db commented Dec 11, 2024

To be released as delta-kernel-rust-sharing-wrapper 0.2.0

Copy link
Collaborator

@linzhou-db linzhou-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave the main review to the kernel rust team

@@ -0,0 +1 @@
Cargo.lock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you run cargo commands or maturin develop, a Cargo.lock file may be generated. Adding it to the gitignore makes sure you don't accidentally add it to git.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for further context, generally for libraries (like this) you don't want to commit whereas binaries you normally do

})?))
}

// TODO(patrick.jin): Change return type to RecordBatchIterator<impl Iterator<...>>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we finish this TODO before merging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to but it seems non-trivial. But I will try to address it before the final release.

More specifically, changing the Vec to an Iterator requires the return lifetime of Scan::execute and TableChangesScan::execute to be '_, and that requires the return type of those functions to be ...<Arc<...>> insteadof ...<Box<...>>. And upon making that change, I get a cannot be converted to a python object error. Will see if @zachschuermann or @OussamaSaoudi-db knows more about this error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posted a fix this morning, so this is now fixed :)

@PatrickJin-db PatrickJin-db force-pushed the PatrickJin-db/cdf_rust_bindings branch from 7dfe565 to 5ce85e1 Compare December 11, 2024 19:37
Comment on lines 90 to 119
fn try_create_record_batch_iter(
results: impl Iterator<Item = DeltaResult<ScanResult>>,
result_schema: ArrowSchemaRef,
) -> RecordBatchIterator<Vec<Result<RecordBatch, ArrowError>>> {
let record_batches: Vec<_> = results
.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) =
scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
Ok(filtered_batch)
} else {
Ok(record_batch)
}
})
.collect();
RecordBatchIterator::new(record_batches, result_schema)
}

Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn try_create_record_batch_iter(
results: impl Iterator<Item = DeltaResult<ScanResult>>,
result_schema: ArrowSchemaRef,
) -> RecordBatchIterator<Vec<Result<RecordBatch, ArrowError>>> {
let record_batches: Vec<_> = results
.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) =
scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
Ok(filtered_batch)
} else {
Ok(record_batch)
}
})
.collect();
RecordBatchIterator::new(record_batches, result_schema)
}
fn try_create_record_batch_iter(
results: impl Iterator<Item = DeltaResult<ScanResult>>,
result_schema: ArrowSchemaRef,
) -> RecordBatchIterator<impl Iterator<Item = Result<RecordBatch, ArrowError>>> {
let record_batches = results.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) = scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))?
.into();
match mask {
Some(mask) => filter_record_batch(&record_batch, &mask.into()),
None => Ok(record_batch),
}
});
RecordBatchIterator::new(record_batches, result_schema)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OussamaSaoudi-db since this is a pretty common pattern I wonder if somewhere in default enging/arrow implementation we should basically provide this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's create an issue for it. we have it in like 3 tests and our examples, and here, so clearly it would be useful to have a canonical implementation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

fn build(&mut self) -> DeltaPyResult<TableChangesScan> {
let scan = self.0.take().unwrap().build()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will panic on unwrap if build is called twice. Perhaps something like

        let scan = self
            .0
            .take()
            .ok_or_else(|| {
                delta_kernel::Error::generic("Can only call build once on TableChangesScanBuilder")
            })?
            .build()?;

And you can put in a suitable error message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue with ScanBuilder::build

@PatrickJin-db PatrickJin-db force-pushed the PatrickJin-db/cdf_rust_bindings branch 4 times, most recently from 8f0d513 to aaf3e43 Compare December 11, 2024 20:34
Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing comments

@@ -0,0 +1 @@
Cargo.lock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for further context, generally for libraries (like this) you don't want to commit whereas binaries you normally do

@@ -11,7 +11,7 @@ crate-type = ["cdylib"]

[dependencies]
arrow = { version = "53.3.0", features = ["pyarrow"] }
delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]}
delta_kernel = { git = "https://github.com/delta-io/delta-kernel-rs.git", rev = "be1453f", features = ["cloud", "default", "default-engine"]}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default features are empty

Suggested change
delta_kernel = { git = "https://github.com/delta-io/delta-kernel-rs.git", rev = "be1453f", features = ["cloud", "default", "default-engine"]}
delta_kernel = { git = "https://github.com/delta-io/delta-kernel-rs.git", rev = "be1453f", features = ["cloud", "default-engine"]}

Comment on lines 90 to 119
fn try_create_record_batch_iter(
results: impl Iterator<Item = DeltaResult<ScanResult>>,
result_schema: ArrowSchemaRef,
) -> RecordBatchIterator<Vec<Result<RecordBatch, ArrowError>>> {
let record_batches: Vec<_> = results
.map(|res| {
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?)));
let (mask, data) =
scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?;
Ok(filtered_batch)
} else {
Ok(record_batch)
}
})
.collect();
RecordBatchIterator::new(record_batches, result_schema)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OussamaSaoudi-db since this is a pretty common pattern I wonder if somewhere in default enging/arrow implementation we should basically provide this

@PatrickJin-db PatrickJin-db force-pushed the PatrickJin-db/cdf_rust_bindings branch 5 times, most recently from df5ab92 to 6351118 Compare December 12, 2024 02:39
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just got nits on how you may want to refactor/change the code. At a high level, this LGTM

.0
.take()
.ok_or_else(|| {
delta_kernel::Error::generic("Can only call build() once on ScanBuilder")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to reduce noise, I'd recommend using type aliases.

use delta_kernel::Error as KernelError;
struct PyKernelError(KernelError);

then you would have both

impl From<PyKernelError> for PyErr {
    fn from(error: PyKernelError) -> Self {
        PyValueError::new_err(format!("Kernel error: {}", error.0))
    }
}

impl From<KernelError> for PyKernelError {
    fn from(delta_kernel_error: delta_kernel::Error) -> Self {
        Self(delta_kernel_error)
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar for other delta_kernel::.*

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for some (but not all) delta_kernel::*. I think refactoring to adopt this pattern everywhere can be a followup

Comment on lines 134 to 168
struct TableChangesScanBuilder(Option<delta_kernel::table_changes::scan::TableChangesScanBuilder>);

#[pymethods]
impl TableChangesScanBuilder {
#[new]
#[pyo3(signature = (table, engine_interface, start_version, end_version=None))]
fn new(
table: &Table,
engine_interface: &PythonInterface,
start_version: u64,
end_version: Option<u64>,
) -> DeltaPyResult<TableChangesScanBuilder> {
let table_changes =
table
.0
.table_changes(engine_interface.0.as_ref(), start_version, end_version)?;
Ok(TableChangesScanBuilder(Some(
table_changes.into_scan_builder(),
)))
}

fn build(&mut self) -> DeltaPyResult<TableChangesScan> {
let scan = self
.0
.take()
.ok_or_else(|| {
delta_kernel::Error::generic(
"Can only call build() once on TableChangesScanBuilder",
)
})?
.build()?;
Ok(TableChangesScan(scan))
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we fail on any subsequent call to build. If we want to avoid the potential error, you can do something like this:

Suggested change
struct TableChangesScanBuilder(Option<delta_kernel::table_changes::scan::TableChangesScanBuilder>);
#[pymethods]
impl TableChangesScanBuilder {
#[new]
#[pyo3(signature = (table, engine_interface, start_version, end_version=None))]
fn new(
table: &Table,
engine_interface: &PythonInterface,
start_version: u64,
end_version: Option<u64>,
) -> DeltaPyResult<TableChangesScanBuilder> {
let table_changes =
table
.0
.table_changes(engine_interface.0.as_ref(), start_version, end_version)?;
Ok(TableChangesScanBuilder(Some(
table_changes.into_scan_builder(),
)))
}
fn build(&mut self) -> DeltaPyResult<TableChangesScan> {
let scan = self
.0
.take()
.ok_or_else(|| {
delta_kernel::Error::generic(
"Can only call build() once on TableChangesScanBuilder",
)
})?
.build()?;
Ok(TableChangesScan(scan))
}
}
#[pyclass]
struct TableChangesScanBuilder {
table_changes: Arc<TableChanges>,
}
#[pymethods]
impl TableChangesScanBuilder {
#[new]
#[pyo3(signature = (table, engine_interface, start_version, end_version=None))]
fn new(
table: &Table,
engine_interface: &PythonInterface,
start_version: u64,
end_version: Option<u64>,
) -> DeltaPyResult<TableChangesScanBuilder> {
let table_changes = table
.0
.table_changes(engine_interface.0.as_ref(), start_version, end_version)?
.into();
Ok(TableChangesScanBuilder { table_changes })
}
fn build(&mut self) -> DeltaPyResult<TableChangesScan> {
// Note: cheap Arc clone
let scan_builder = self.table_changes.clone().scan_builder();
let scan = scan_builder.build()?;
Ok(TableChangesScan(scan))
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do the same thing in Snapsot/ScanBulider. @zachschuermann would like to see what you think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we change this behavior, we should change it for both TableChangesScanBuilder and ScanBuilder to be consistent.

python/delta-kernel-rust-sharing-wrapper/src/lib.rs Outdated Show resolved Hide resolved
@PatrickJin-db PatrickJin-db force-pushed the PatrickJin-db/cdf_rust_bindings branch 5 times, most recently from 979c558 to 757aa83 Compare December 13, 2024 09:41
Ok(PyArrowType(Box::new(record_batch_iter)))
}
}

#[pyclass]
struct PythonInterface(Box<dyn Engine + Send>);
struct PythonInterface(Arc<dyn Engine + Send>);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the motivation to change from box to arc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is related to this kernel change. Now we have to pass an Arc<dyn Engine> into Scan::execute/TableChangesScan::execute.

@PatrickJin-db PatrickJin-db force-pushed the PatrickJin-db/cdf_rust_bindings branch 2 times, most recently from ac1659c to 6449594 Compare December 18, 2024 01:45
Co-authored-by: Oussama Saoudi <[email protected]>
@PatrickJin-db PatrickJin-db force-pushed the PatrickJin-db/cdf_rust_bindings branch from 6449594 to 04c7a31 Compare December 18, 2024 02:20
@PatrickJin-db PatrickJin-db merged commit 9c33111 into delta-io:main Dec 18, 2024
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants