Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

daft.DataFrame.write_deltalake does not support append for tables with schema evolution #3559

Open
anilmenon14 opened this issue Dec 12, 2024 · 3 comments
Labels
bug Something isn't working data-catalogs Related to support for Data Catalogs p2 Nice to have features

Comments

@anilmenon14
Copy link
Contributor

Describe the bug

Appends to Delta Lake is not permitted when schema between the DataFrame and the DeltaLake table differs. There appears to be safety in place in daft.DataFrame.write_deltalake , which was noticed when working on PR #3522 that prevents this from happening. Specifically, this block of code is the one preventing this.

Without knowing too much of the history, I assume this safety is in place since delta-rs likely has/had some limitation.
I could locate https://github.com/delta-io/delta-rs/pull/2246in the delta-rs project that appears to mention there is support for what we are seeking, however from what I see in the delta-rs writer, I believe this is the relevant block of code explicitly prevents mode='append' and schema_mode='overwrite' from being done.

Handling schema evolution using Daft DataFrame writes to Delta lake would be a great feature.

To Reproduce

  1. Read a DataFrame using daft.DataFrame.read_deltalake()
  2. Perform transformations that add/remove columns.
  3. Attempt to daft.DataFrame.write_deltalake(table="sometable",mode="append",schema_mode="overwrite")

Expected behavior

Should allow appends to existing tables.

Component(s)

Other

Additional context

No response

@anilmenon14 anilmenon14 added bug Something isn't working needs triage labels Dec 12, 2024
@andrewgazelka andrewgazelka added the p2 Nice to have features label Dec 12, 2024
@andrewgazelka
Copy link
Member

Thank you for reporting this. @jaychia any thoughts on this?

@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Hi @anilmenon14 , what is the expected behavior here? Should Daft be allowing missing columns/extra columns/different columns by performing column pruning or adding columns of data with all nulls?

@anilmenon14
Copy link
Contributor Author

anilmenon14 commented Dec 15, 2024

Hi @jaychia ,
To replicate the behavior for the same DataFrame code I used in Daft to trigger appends, I recreated the same experiment with exact same dataset I had been using for Daft, with the Spark syntax in a Databricks environment:

# Read the NYC taxi dataset
import pyspark.sql.functions as F
df = spark.read.table('some_catalog.some_schema.nyc_taxi_data')

# Perform a couple of transformations that creates 2 new columns and drop an existing column
df_transformed = (df.withColumns(
                    {
                        "tpep_pickup_datetime": F.timestamp_add("DAY", F.lit(7), "tpep_pickup_datetime"),
                        "tpep_dropoff_datetime": F.timestamp_add("DAY", F.lit(7), "tpep_dropoff_datetime"),
                        "trip_distance": F.round(F.col("trip_distance") * 1.2, 2),
                        "fare_amount": F.round(F.col("fare_amount") * 1.3, 2),
                    })
                    .withColumn("trip_distance_km", F.round(F.col("trip_distance") * 1.609344, 2))
                    .withColumn("fare_class",F.when(F.col("fare_amount") < 20.0, "Economical").otherwise("Expensive"),)
                    .drop("trip_distance")
)

# Write the transformed data as an append to the existing table
(
df_transformed.write
.mode("append")
.option("mergeSchema",True)
.saveAsTable('some_catalog.some_schema.nyc_taxi_data')
)

This is how this append in Spark behaves with merge schema option:
image

I forked the current main branch and altered this L942 of the daft/dataframe/dataframe.py to accept mode="append" to try and see how Daft .write_deltalake would behave on append with schema overwrite.
By doing so, I could proceed to execute something like my_daft_df.write_deltalake(unity_table_ext,mode="append",schema_mode = "overwrite"), however it did not correctly lead to schema evolution as we could observe with Spark. It appeared to create a new delta log with append transaction , however strangely when reading the table again (either using Daft or Spark), the old/initial schema of the table was still in use.

In short, I'd expect

  • new columns to be added, with pre-existing rows to hold a null value for these new columns.
  • old columns to continue to have values for pre-existing rows and render new rows with null values when displaying them.

Let me know if you like me to look into anything else and I'm happy to help.

@jaychia jaychia added the data-catalogs Related to support for Data Catalogs label Dec 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working data-catalogs Related to support for Data Catalogs p2 Nice to have features
Projects
None yet
Development

No branches or pull requests

4 participants