Skip to content

Commit

Permalink
fix: unity catalog import from write_deltalake (#3630)
Browse files Browse the repository at this point in the history
Closes: #3629

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Dec 20, 2024
1 parent 5d4db4f commit eabd029
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,11 +870,11 @@ def write_deltalake(
from packaging.version import parse

from daft import from_pydict
from daft.dependencies import unity_catalog
from daft.filesystem import get_protocol_from_path
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
from daft.unity_catalog import UnityCatalogTable

if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
Expand All @@ -884,30 +884,35 @@ def write_deltalake(

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
# Retrieve table_uri and storage_options from various backends
table_uri: str
storage_options: dict

if isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})
else:
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
elif isinstance(table, UnityCatalogTable):
elif unity_catalog.module_available() and isinstance(table, unity_catalog.UnityCatalogTable):
table_uri = table.table_uri
io_config = table.io_config
else:
elif isinstance(table, DataCatalogTable):
table_uri = table.table_uri(io_config)
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

if io_config is None:
raise ValueError(
"io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
"io_config was not provided to write_deltalake and could not be retrieved from defaults."
)

storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

# see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
scheme = get_protocol_from_path(table_uri)
Expand Down

0 comments on commit eabd029

Please sign in to comment.