From b7749baae6862e3b48d0f8646f58e1390502f28a Mon Sep 17 00:00:00 2001 From: Varun Sharma Date: Mon, 6 Jan 2025 00:21:02 +0530 Subject: [PATCH 1/5] snapshot streamlining and new configs --- .../materializations/snapshot/helpers.sql | 122 +++-- .../materializations/snapshot/snapshot.sql | 27 +- .../snapshot/snapshot_merge.sql | 29 +- .../materializations/snapshot/strategies.sql | 18 +- .../adapter/simple_snapshot/fixtures.py | 464 ++++++++++++++++++ .../simple_snapshot/test_various_configs.py | 256 ++++++++++ 6 files changed, 855 insertions(+), 61 deletions(-) create mode 100644 tests/functional/adapter/simple_snapshot/fixtures.py create mode 100644 tests/functional/adapter/simple_snapshot/test_various_configs.py diff --git a/dbt/include/teradata/macros/materializations/snapshot/helpers.sql b/dbt/include/teradata/macros/materializations/snapshot/helpers.sql index 6c51c494..fce579d3 100644 --- a/dbt/include/teradata/macros/materializations/snapshot/helpers.sql +++ b/dbt/include/teradata/macros/materializations/snapshot/helpers.sql @@ -15,6 +15,7 @@ {% endmacro %} {% macro teradata__snapshot_staging_table(strategy, source_sql, target_relation) -%} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} with snapshot_query as ( @@ -25,10 +26,16 @@ snapshotted_data as ( select snapshot.*, - {{ strategy.unique_key }} as dbt_unique_key + {{ unique_key_fields(strategy.unique_key) }} from {{ target_relation }} as snapshot - where dbt_valid_to is null + where + {% if config.get('dbt_valid_to_current') %} + {# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #} + ( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null) + {% else %} + {{ columns.dbt_valid_to }} is null + {% endif %} ), @@ -36,11 +43,11 @@ select snapshot_query.*, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, - {{ strategy.scd_id }} as dbt_scd_id + {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} from snapshot_query ), @@ -49,21 +56,21 @@ select snapshot_query.*, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - {{ strategy.updated_at }} as dbt_valid_to + {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} from snapshot_query ), - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} deletes_source_data as ( select snapshot_query.*, - {{ strategy.unique_key }} as dbt_unique_key + {{ unique_key_fields(strategy.unique_key) }} from snapshot_query ), {% endif %} @@ -73,16 +80,16 @@ select 'insert' as dbt_change_type, source_data.* + {%- if strategy.hard_deletes == 'new_record' -%} + ,'False' as {{ columns.dbt_is_deleted }} + {%- endif %} from insertions_source_data as source_data - left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) - ) + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and {{ strategy.row_changed }}) + ), @@ -91,16 +98,20 @@ select 'update' as dbt_change_type, source_data.*, - snapshotted_data.dbt_scd_id + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} from updates_source_data as source_data - join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where ( {{ strategy.row_changed }} ) ) - {%- if strategy.invalidate_hard_deletes -%} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%} , deletes as ( @@ -108,34 +119,75 @@ select 'delete' as dbt_change_type, source_data.*, - {{ snapshot_get_time() }} as dbt_valid_from, - {{ snapshot_get_time() }} as dbt_updated_at, - {{ snapshot_get_time() }} as dbt_valid_to, - snapshotted_data.dbt_scd_id + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) + {%- endif %} + + {%- if strategy.hard_deletes == 'new_record' %} + {% set source_sql_cols = get_column_schema_from_query(source_sql) %} + , + deletion_records as ( + + select + 'insert' as dbt_change_type, + {%- for col in source_sql_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.dbt_unique_key as dbt_unique_key, + {% endif -%} + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }}, + 'True' as {{ columns.dbt_is_deleted }} from snapshotted_data - left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} ) {%- endif %} select * from insertions union all select * from updates - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} union all select * from deletes {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records + {%- endif %} {%- endmacro %} {% macro teradata__build_snapshot_table(strategy, sql) %} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} select sbq.*, - {{ strategy.scd_id }} as dbt_scd_id, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }} + {%- if strategy.hard_deletes == 'new_record' -%} + , 'False' as {{ columns.dbt_is_deleted }} + {% endif -%} from ( {{ sql }} ) sbq diff --git a/dbt/include/teradata/macros/materializations/snapshot/snapshot.sql b/dbt/include/teradata/macros/materializations/snapshot/snapshot.sql index 1451cbb0..bb2f7c00 100644 --- a/dbt/include/teradata/macros/materializations/snapshot/snapshot.sql +++ b/dbt/include/teradata/macros/materializations/snapshot/snapshot.sql @@ -1,5 +1,4 @@ {% materialization snapshot, adapter='teradata' %} - {%- set config = model['config'] -%} -- calling the macro set_query_band() which will set the query_band for this materialization as per the user_configuration {% do set_query_band() %} @@ -27,7 +26,9 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + {# The model['config'] parameter below is no longer used, but passing anyway for compatibility #} + {# It was a dictionary of config, instead of the config object from the context #} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} @@ -42,28 +43,32 @@ {% do adapter.drop_relation(make_temp_relation(target_relation)) %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} + + {{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} -- this may no-op if the database does not require column expansion {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} diff --git a/dbt/include/teradata/macros/materializations/snapshot/snapshot_merge.sql b/dbt/include/teradata/macros/materializations/snapshot/snapshot_merge.sql index 8be1e3cd..f92a2a2e 100644 --- a/dbt/include/teradata/macros/materializations/snapshot/snapshot_merge.sql +++ b/dbt/include/teradata/macros/materializations/snapshot/snapshot_merge.sql @@ -1,11 +1,17 @@ {% macro teradata__snapshot_merge_sql_update(target, source, insert_cols) -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} UPDATE {{ target }} - FROM (SELECT dbt_scd_id, dbt_change_type, dbt_valid_to FROM {{ source }}) AS DBT_INTERNAL_SOURCE - SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to - WHERE DBT_INTERNAL_SOURCE.dbt_scd_id = {{ target }}.dbt_scd_id + FROM (SELECT {{ columns.dbt_scd_id }}, dbt_change_type, {{ columns.dbt_valid_to }} FROM {{ source }}) AS DBT_INTERNAL_SOURCE + SET {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} + WHERE DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = {{ target }}.{{ columns.dbt_scd_id }} AND DBT_INTERNAL_SOURCE.dbt_change_type = 'update' - AND {{ target }}.dbt_valid_to IS NULL + {% if config.get("dbt_valid_to_current") %} + AND ({{ target }}.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or + {{ target }}.{{ columns.dbt_valid_to }} is null) + {% else %} + AND {{ target }}.{{ columns.dbt_valid_to }} is null + {% endif %} {% endmacro %} {% macro teradata__snapshot_merge_sql_insert(target, source, insert_cols) -%} @@ -20,10 +26,17 @@ {% endmacro %} {% macro teradata__snapshot_merge_sql_delete(target, source, insert_cols) -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} UPDATE {{ target }} - FROM (SELECT dbt_scd_id, dbt_change_type, dbt_valid_to FROM {{ source }}) AS DBT_INTERNAL_SOURCE - SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to - WHERE DBT_INTERNAL_SOURCE.dbt_scd_id = {{ target }}.dbt_scd_id + FROM (SELECT {{ columns.dbt_scd_id }}, dbt_change_type, {{ columns.dbt_valid_to }} FROM {{ source }}) AS DBT_INTERNAL_SOURCE + SET {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} + WHERE DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = {{ target }}.{{ columns.dbt_scd_id }} AND DBT_INTERNAL_SOURCE.dbt_change_type = 'delete' - AND {{ target }}.dbt_valid_to IS NULL + {% if config.get("dbt_valid_to_current") %} + AND ({{ target }}.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or + {{ target }}.{{ columns.dbt_valid_to }} is null) + {% else %} + AND {{ target }}.{{ columns.dbt_valid_to }} is null + {% endif %} + {% endmacro %} diff --git a/dbt/include/teradata/macros/materializations/snapshot/strategies.sql b/dbt/include/teradata/macros/materializations/snapshot/strategies.sql index a917c482..c0b0b854 100644 --- a/dbt/include/teradata/macros/materializations/snapshot/strategies.sql +++ b/dbt/include/teradata/macros/materializations/snapshot/strategies.sql @@ -49,11 +49,13 @@ {%- endmacro %} {#-- This macro is copied varbatim from dbt-core. The only delta is that != operator is replaced with <> #} -{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set check_cols_config = config['check_cols'] %} - {% set primary_key = config['unique_key'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} - {% set updated_at = config.get('updated_at', snapshot_get_time()) %} +{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set check_cols_config = config.get('check_cols') %} + {% set primary_key = config.get('unique_key') %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} + {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %} @@ -78,7 +80,8 @@ ) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% set snapshot_hash_udf = config.get('snapshot_hash_udf') %} {% if snapshot_hash_udf is not none %} {% set scd_id_expr = scd_id_expr |replace("HASHROW",snapshot_hash_udf) %} @@ -90,6 +93,7 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} diff --git a/tests/functional/adapter/simple_snapshot/fixtures.py b/tests/functional/adapter/simple_snapshot/fixtures.py new file mode 100644 index 00000000..57385a2e --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/fixtures.py @@ -0,0 +1,464 @@ +snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.schema}}.seed + +{% endsnapshot %} +""" + +snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + +create_seed_sql = """ +create table {schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + +create_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + test_valid_from TIMESTAMP, + test_valid_to TIMESTAMP, + test_scd_id BYTE(4), + test_updated_at TIMESTAMP +); +""" + +seed_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as timestamp) as test_valid_to, + updated_at as test_updated_at, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id +from {schema}.seed; +""" + +invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1' hour, + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1' hour +where id >= 10 and id <= 20; + +""" + +update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as timestamp) as test_valid_to, + updated_at as test_updated_at, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + +model_seed_sql = """ +select * from {{target.schema}}.seed +""" + +snapshots_multi_key_yml = """ +snapshots: + - name: snapshot_actual + relation: "ref('seed')" + config: + strategy: timestamp + updated_at: updated_at + unique_key: + - id1 + - id2 + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + + +# multi-key snapshot fixtures + +create_multi_key_seed_sql = """ +create table {schema}.seed ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + +create_multi_key_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + test_valid_from TIMESTAMP, + test_valid_to TIMESTAMP, + test_scd_id BYTE(4), + test_updated_at TIMESTAMP +); +""" + +seed_multi_key_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 100, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(2, 200, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(3, 300, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(4, 400, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(5, 500, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(6, 600, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(7, 700, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(8, 800, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(9, 900, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(10, 1000, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(11, 1100, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(12, 1200, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(13, 1300, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(14, 1400, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(15, 1500, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(16, 1600, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(17, 1700, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(18, 1800, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(19, 1900, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'); +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(20, 2000, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_multi_key_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as timestamp) as test_valid_to, + updated_at as test_updated_at, + HASHROW(coalesce(cast(id1 as varchar(50)), '') || '|' || coalesce(cast(id2 as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id +from {schema}.seed; +""" + +invalidate_multi_key_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1' hour, + email = case when id1 = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id1 >= 10 and id1 <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1' hour +where id1 >= 10 and id1 <= 20; + +""" + + +update_multi_key_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as timestamp) as test_valid_to, + updated_at as test_updated_at, + HASHROW(coalesce(cast(id1 as varchar(50)), '') || '|' || coalesce(cast(id2 as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id +from {schema}.seed +where id1 >= 10 and id1 <= 20; +""" + +snapshots_valid_to_current_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + dbt_valid_to_current: "cast('2099-12-31 00:00:00' as timestamp)" + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +populate_snapshot_expected_valid_to_current_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast('2099-12-31 00:00:00' as timestamp) as test_valid_to, + updated_at as test_updated_at, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id +from {schema}.seed; +""" + +update_with_current_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast('2099-12-31 00:00:00' as timestamp) as test_valid_to, + updated_at as test_updated_at, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + + +snapshots_no_column_names_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at +""" diff --git a/tests/functional/adapter/simple_snapshot/test_various_configs.py b/tests/functional/adapter/simple_snapshot/test_various_configs.py new file mode 100644 index 00000000..0fdbc14f --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/test_various_configs.py @@ -0,0 +1,256 @@ +import pytest + +import datetime + +from dbt.tests.util import ( + check_relations_equal, + get_manifest, + run_dbt, + run_dbt_and_capture, + run_sql_with_adapter, + update_config_file, +) + +from fixtures import (snapshot_actual_sql, snapshots_yml, ref_snapshot_sql, + create_seed_sql, create_snapshot_expected_sql, + invalidate_sql, seed_insert_sql, + populate_snapshot_expected_sql, update_sql, + model_seed_sql, snapshots_multi_key_yml, create_multi_key_seed_sql, create_multi_key_snapshot_expected_sql, + seed_multi_key_insert_sql, populate_multi_key_snapshot_expected_sql, invalidate_multi_key_sql, + update_multi_key_sql, snapshots_valid_to_current_yml, populate_snapshot_expected_valid_to_current_sql, + update_with_current_sql, snapshots_no_column_names_yml) + +class BaseSnapshotColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + +class TestSnapshotColumnNamesTeradata(BaseSnapshotColumnNames): + pass + + +class BaseSnapshotColumnNamesFromDbtProject: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_column_names_from_project(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # run_dbt(["test"]) + # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + +class TestSnapshotColumnNamesFromDbtProjectTeradata(BaseSnapshotColumnNamesFromDbtProject): + pass + + +class BaseSnapshotInvalidColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_invalid_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + manifest = get_manifest(project.project_root) + snapshot_node = manifest.nodes["snapshot.test.snapshot_actual"] + snapshot_node.config.snapshot_meta_column_names == { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + # Change snapshot_meta_columns and look for an error + different_columns = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_updated_at": "test_updated_at", + } + } + } + } + update_config_file(different_columns, "dbt_project.yml") + + results, log_output = run_dbt_and_capture(["snapshot"], expect_pass=False) + assert len(results) == 1 + assert "Compilation Error in snapshot snapshot_actual" in log_output + assert "Snapshot target is missing configured columns" in log_output + +class TestSnapshotInvalidColumnNamesTeradata(BaseSnapshotInvalidColumnNames): + pass + +# This uses snapshot_meta_column_names, yaml-only snapshot def, +# and multiple keys +class BaseSnapshotMultiUniqueKey: + @pytest.fixture(scope="class") + def models(self): + return { + "seed.sql": model_seed_sql, + "snapshots.yml": snapshots_multi_key_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_multi_column_unique_key(self, project): + project.run_sql(create_multi_key_seed_sql) + project.run_sql(create_multi_key_snapshot_expected_sql) + project.run_sql(seed_multi_key_insert_sql) + project.run_sql(populate_multi_key_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_multi_key_sql) + project.run_sql(update_multi_key_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotMultiUniqueKeyTeradata(BaseSnapshotMultiUniqueKey): + pass + + +class BaseSnapshotDbtValidToCurrent: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_valid_to_current_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_valid_to_current(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_valid_to_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + original_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert original_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + assert original_snapshot[9][2] == datetime.datetime(2099, 12, 31, 0, 0) + + project.run_sql(invalidate_sql) + + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + updated_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + # assert updated_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + # Original row that was updated now has a non-current (2099/12/31) date + # assert updated_snapshot[9][2] == datetime.datetime(2016, 8, 20, 16, 44, 49) + # # Updated row has a current date + project.run_sql(update_with_current_sql) + assert updated_snapshot[20][2] == datetime.datetime(2099, 12, 31, 0, 0) + + # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + +class TestSnapshotDbtValidToCurrentTeradata(BaseSnapshotDbtValidToCurrent): + pass \ No newline at end of file From 3921f424a42d41b24eca7e5d41decf854efa6dfe Mon Sep 17 00:00:00 2001 From: Varun Sharma Date: Mon, 6 Jan 2025 10:49:44 +0530 Subject: [PATCH 2/5] resolving import issue --- .../functional/adapter/simple_snapshot/test_various_configs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/simple_snapshot/test_various_configs.py b/tests/functional/adapter/simple_snapshot/test_various_configs.py index 0fdbc14f..8a4038ee 100644 --- a/tests/functional/adapter/simple_snapshot/test_various_configs.py +++ b/tests/functional/adapter/simple_snapshot/test_various_configs.py @@ -11,7 +11,7 @@ update_config_file, ) -from fixtures import (snapshot_actual_sql, snapshots_yml, ref_snapshot_sql, +from tests.functional.adapter.simple_snapshot.fixtures import (snapshot_actual_sql, snapshots_yml, ref_snapshot_sql, create_seed_sql, create_snapshot_expected_sql, invalidate_sql, seed_insert_sql, populate_snapshot_expected_sql, update_sql, From ae704bdd28e6ad83836b69927b193b6d8f3b2b0e Mon Sep 17 00:00:00 2001 From: Varun Sharma Date: Mon, 6 Jan 2025 14:05:09 +0530 Subject: [PATCH 3/5] added testcases for new_record config --- .../simple_snapshot/new_record_mode.py | 248 ++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 tests/functional/adapter/simple_snapshot/new_record_mode.py diff --git a/tests/functional/adapter/simple_snapshot/new_record_mode.py b/tests/functional/adapter/simple_snapshot/new_record_mode.py new file mode 100644 index 00000000..eae46fcb --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/new_record_mode.py @@ -0,0 +1,248 @@ +import pytest + +from dbt.tests.util import check_relations_equal, run_dbt + +_seed_new_record_mode = """ +create table {schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + +create_snapshot_expected_sql=""" +create table {schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + dbt_valid_from TIMESTAMP, + dbt_valid_to TIMESTAMP, + dbt_scd_id BYTE(4), + dbt_updated_at TIMESTAMP, + dbt_is_deleted varchar(50) +); +""" +seed_insert_sql=""" +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'); +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_snapshot_expected_sql=""" +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + cast(null as timestamp) as dbt_valid_to, + updated_at as dbt_updated_at, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id, + 'False' as dbt_is_deleted +from {schema}.seed; +""" + +_snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.schema}}.seed + +{% endsnapshot %} +""" + +_snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + hard_deletes: new_record +""" + +_ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +_invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1' hour, + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + dbt_valid_to = updated_at + interval '1' hour +where id >= 10 and id <= 20; + +""" + +_update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + cast(null as timestamp) as dbt_valid_to, + updated_at as dbt_updated_at, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id, + 'False' as dbt_is_deleted +from {schema}.seed +where id >= 10 and id <= 20; +""" + +_delete_sql = """ +delete from {schema}.seed where id = 1 +""" + + +class SnapshotNewRecordMode: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": _snapshots_yml, + "ref_snapshot.sql": _ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def invalidate_sql(self): + return _invalidate_sql + + @pytest.fixture(scope="class") + def update_sql(self): + return _update_sql + + @pytest.fixture(scope="class") + def delete_sql(self): + return _delete_sql + + def test_snapshot_new_record_mode( + self, project, invalidate_sql, update_sql + ): + project.run_sql(_seed_new_record_mode) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + project.run_sql(_delete_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + +class TestSnapshotNewRecordModeTeradata(SnapshotNewRecordMode): + pass \ No newline at end of file From 0de43b302feff3b6526dfe4799855a0f8152f5e2 Mon Sep 17 00:00:00 2001 From: Varun Sharma Date: Tue, 7 Jan 2025 17:29:40 +0530 Subject: [PATCH 4/5] addition of code for validation of snapshot tables in testcases --- CHANGELOG.md | 6 ++ .../adapter/simple_snapshot/fixtures.py | 24 +++---- .../simple_snapshot/test_various_configs.py | 64 +++++++++++++++++-- 3 files changed, 77 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5cd4776..4f0162e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,15 @@ ## dbt-teradata 1.0.0a ### Features +* Addition of new Snapshot features with dbt-teradata v1.9 ([#207](https://github.com/Teradata/dbt-teradata/pull/207)): + * Enable setting a datetime value for dbt_valid_to for current records instead of NULL + * Enable hard_deletes config to add a metadata column if a record has been deleted + * Allow customizing names of metadata fields + * Enable unique_key as a list ### Fixes ### Docs ### Under the hood +* Addition of testcases for Snapshot diff --git a/tests/functional/adapter/simple_snapshot/fixtures.py b/tests/functional/adapter/simple_snapshot/fixtures.py index 57385a2e..9976a0e9 100644 --- a/tests/functional/adapter/simple_snapshot/fixtures.py +++ b/tests/functional/adapter/simple_snapshot/fixtures.py @@ -115,10 +115,10 @@ gender, ip_address, updated_at, - test_valid_from, - test_valid_to, + test_scd_id, test_updated_at, - test_scd_id + test_valid_from, + test_valid_to ) select @@ -130,10 +130,10 @@ ip_address, updated_at, -- fields added by snapshotting - updated_at as test_valid_from, - cast(null as timestamp) as test_valid_to, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id, updated_at as test_updated_at, - HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id + updated_at as test_valid_from, + cast(null as timestamp) as test_valid_to from {schema}.seed; """ @@ -163,10 +163,10 @@ gender, ip_address, updated_at, - test_valid_from, - test_valid_to, + test_scd_id, test_updated_at, - test_scd_id + test_valid_from, + test_valid_to ) select @@ -178,10 +178,10 @@ ip_address, updated_at, -- fields added by snapshotting - updated_at as test_valid_from, - cast(null as timestamp) as test_valid_to, + HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id, updated_at as test_updated_at, - HASHROW(coalesce(cast(id || '-' || first_name as varchar(50)), '') || '|' || coalesce(cast(updated_at as varchar(50)), '')) as test_scd_id + updated_at as test_valid_from, + cast(null as timestamp) as test_valid_to from {schema}.seed where id >= 10 and id <= 20; """ diff --git a/tests/functional/adapter/simple_snapshot/test_various_configs.py b/tests/functional/adapter/simple_snapshot/test_various_configs.py index 8a4038ee..fe9b7b41 100644 --- a/tests/functional/adapter/simple_snapshot/test_various_configs.py +++ b/tests/functional/adapter/simple_snapshot/test_various_configs.py @@ -9,6 +9,7 @@ run_dbt_and_capture, run_sql_with_adapter, update_config_file, + relation_from_name, ) from tests.functional.adapter.simple_snapshot.fixtures import (snapshot_actual_sql, snapshots_yml, ref_snapshot_sql, @@ -47,7 +48,21 @@ def test_snapshot_column_names(self, project): results = run_dbt(["snapshot"]) assert len(results) == 1 - # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + relation_actual = relation_from_name(project.adapter, "snapshot_actual") + relation_expected = relation_from_name(project.adapter, "snapshot_expected") + result = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected}", fetch="one") + + # if two expected and actual snapshot tables are equal then the result varible would be None, as there would no difference between the two relations + assert result == None + + result2 = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual}", fetch="one") + assert result2 == None + + # check_relations_equal_snapshot(project.adapter, ["snapshot_actual", "snapshot_expected"]) class TestSnapshotColumnNamesTeradata(BaseSnapshotColumnNames): pass @@ -95,6 +110,19 @@ def test_snapshot_column_names_from_project(self, project): results = run_dbt(["snapshot"]) assert len(results) == 1 + relation_actual = relation_from_name(project.adapter, "snapshot_actual") + relation_expected = relation_from_name(project.adapter, "snapshot_expected") + result1 = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected}", fetch="one") + + # if two expected and actual snapshot tables are equal then the result varible would be None, as there would no difference between the two relations + assert result1 == None + + result2 = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual}", fetch="one") + assert result2 == None # run_dbt(["test"]) # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) @@ -196,6 +224,21 @@ def test_multi_column_unique_key(self, project): results = run_dbt(["snapshot"]) assert len(results) == 1 + + relation_actual = relation_from_name(project.adapter, "snapshot_actual") + relation_expected = relation_from_name(project.adapter, "snapshot_expected") + result = project.run_sql(f"select id1, id2, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual} \ + minus \ + select id1, id2, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected}", fetch="one") + + # if two expected and actual snapshot tables are equal then the result varible would be None, as there would no difference between the two relations + assert result == None + + result2 = project.run_sql(f"select id1, id2, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected} \ + minus \ + select id1, id2, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual}", fetch="one") + assert result2 == None + # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) @@ -243,13 +286,24 @@ def test_valid_to_current(self, project): "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", "all", ) - # assert updated_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) - # Original row that was updated now has a non-current (2099/12/31) date - # assert updated_snapshot[9][2] == datetime.datetime(2016, 8, 20, 16, 44, 49) - # # Updated row has a current date + project.run_sql(update_with_current_sql) assert updated_snapshot[20][2] == datetime.datetime(2099, 12, 31, 0, 0) + relation_actual = relation_from_name(project.adapter, "snapshot_actual") + relation_expected = relation_from_name(project.adapter, "snapshot_expected") + + result = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected}", fetch="one") + + # if two expected and actual snapshot tables are equal then the result varible would be None, as there would no difference between the two relations + assert result == None + + result2 = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_expected} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, test_valid_from, test_valid_to, test_scd_id, test_updated_at from {relation_actual}", fetch="one") + assert result2 == None # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) class TestSnapshotDbtValidToCurrentTeradata(BaseSnapshotDbtValidToCurrent): From d441ae173f584b21bde4b74c2089eb8b477403c8 Mon Sep 17 00:00:00 2001 From: Varun Sharma Date: Tue, 7 Jan 2025 17:49:02 +0530 Subject: [PATCH 5/5] added validation of snapshot table in new_record_mode testcase --- .../adapter/simple_snapshot/new_record_mode.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/functional/adapter/simple_snapshot/new_record_mode.py b/tests/functional/adapter/simple_snapshot/new_record_mode.py index eae46fcb..74951bfe 100644 --- a/tests/functional/adapter/simple_snapshot/new_record_mode.py +++ b/tests/functional/adapter/simple_snapshot/new_record_mode.py @@ -1,6 +1,6 @@ import pytest -from dbt.tests.util import check_relations_equal, run_dbt +from dbt.tests.util import check_relations_equal, run_dbt, relation_from_name _seed_new_record_mode = """ create table {schema}.seed ( @@ -237,6 +237,20 @@ def test_snapshot_new_record_mode( results = run_dbt(["snapshot"]) assert len(results) == 1 + relation_actual = relation_from_name(project.adapter, "snapshot_actual") + relation_expected = relation_from_name(project.adapter, "snapshot_expected") + + result = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, dbt_valid_from, dbt_valid_to, dbt_scd_id, dbt_updated_at, dbt_is_deleted from {relation_actual} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, dbt_valid_from, dbt_valid_to, dbt_scd_id, dbt_updated_at, dbt_is_deleted from {relation_expected}", fetch="one") + + # if two expected and actual snapshot tables are equal then the result varible would be None, as there would no difference between the two relations + assert result == None + + result2 = project.run_sql(f"select id, first_name, last_name, email, gender, ip_address, updated_at, dbt_valid_from, dbt_valid_to, dbt_scd_id, dbt_updated_at, dbt_is_deleted from {relation_expected} \ + minus \ + select id, first_name, last_name, email, gender, ip_address, updated_at, dbt_valid_from, dbt_valid_to, dbt_scd_id, dbt_updated_at, dbt_is_deleted from {relation_actual}", fetch="one") + assert result2 == None # check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) project.run_sql(_delete_sql)