From eabd029d92e094d87c3c8afb274ebe18f2841793 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Sat, 21 Dec 2024 04:18:47 +0800 Subject: [PATCH] fix: unity catalog import from write_deltalake (#3630) Closes: #3629 --------- Co-authored-by: Jay Chia --- daft/dataframe/dataframe.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index ca23c73cbb..7174ef73f4 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -870,11 +870,11 @@ def write_deltalake( from packaging.version import parse from daft import from_pydict + from daft.dependencies import unity_catalog from daft.filesystem import get_protocol_from_path from daft.io import DataCatalogTable from daft.io._deltalake import large_dtypes_kwargs from daft.io.object_store_options import io_config_to_storage_options - from daft.unity_catalog import UnityCatalogTable if schema_mode == "merge": raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.") @@ -884,30 +884,35 @@ def write_deltalake( io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config - if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)): + # Retrieve table_uri and storage_options from various backends + table_uri: str + storage_options: dict + + if isinstance(table, deltalake.DeltaTable): + table_uri = table.table_uri + storage_options = table._storage_options or {} + new_storage_options = io_config_to_storage_options(io_config, table_uri) + storage_options.update(new_storage_options or {}) + else: if isinstance(table, str): table_uri = table elif isinstance(table, pathlib.Path): table_uri = str(table) - elif isinstance(table, UnityCatalogTable): + elif unity_catalog.module_available() and isinstance(table, unity_catalog.UnityCatalogTable): table_uri = table.table_uri io_config = table.io_config - else: + elif isinstance(table, DataCatalogTable): table_uri = table.table_uri(io_config) + else: + raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}") if io_config is None: raise ValueError( - "io_config was not provided to write_deltalake and could not be retrieved from the default configuration." + "io_config was not provided to write_deltalake and could not be retrieved from defaults." ) + storage_options = io_config_to_storage_options(io_config, table_uri) or {} table = try_get_deltatable(table_uri, storage_options=storage_options) - elif isinstance(table, deltalake.DeltaTable): - table_uri = table.table_uri - storage_options = table._storage_options or {} - new_storage_options = io_config_to_storage_options(io_config, table_uri) - storage_options.update(new_storage_options or {}) - else: - raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}") # see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/ scheme = get_protocol_from_path(table_uri)