Skip to content

Commit

Permalink
Merge pull request #207 from Teradata/vs255034_IDE-24784
Browse files Browse the repository at this point in the history
snapshot streamlining and new configs
  • Loading branch information
tallamohan authored Jan 9, 2025
2 parents 1fdd3b3 + d441ae1 commit f6ff463
Show file tree
Hide file tree
Showing 8 changed files with 1,177 additions and 61 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
## dbt-teradata 1.0.0a

### Features
* Addition of new Snapshot features with dbt-teradata v1.9 ([#207](https://github.com/Teradata/dbt-teradata/pull/207)):
* Enable setting a datetime value for dbt_valid_to for current records instead of NULL
* Enable hard_deletes config to add a metadata column if a record has been deleted
* Allow customizing names of metadata fields
* Enable unique_key as a list

### Fixes

### Docs

### Under the hood
* Addition of testcases for Snapshot
122 changes: 87 additions & 35 deletions dbt/include/teradata/macros/materializations/snapshot/helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
{% endmacro %}

{% macro teradata__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

with snapshot_query as (

Expand All @@ -25,22 +26,28 @@
snapshotted_data as (

select snapshot.*,
{{ strategy.unique_key }} as dbt_unique_key
{{ unique_key_fields(strategy.unique_key) }}

from {{ target_relation }} as snapshot
where dbt_valid_to is null
where
{% if config.get('dbt_valid_to_current') %}
{# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #}
( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null)
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}

),

insertions_source_data as (

select
snapshot_query.*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}

from snapshot_query
),
Expand All @@ -49,21 +56,21 @@

select
snapshot_query.*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}

from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}

deletes_source_data as (

select
snapshot_query.*,
{{ strategy.unique_key }} as dbt_unique_key
{{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}
Expand All @@ -73,16 +80,16 @@
select
'insert' as dbt_change_type,
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}

from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and {{ strategy.row_changed }})


),

Expand All @@ -91,51 +98,96 @@
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
{{ strategy.row_changed }}
)
)

{%- if strategy.invalidate_hard_deletes -%}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,

deletion_records as (

select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}

{%- endmacro %}

{% macro teradata__build_snapshot_table(strategy, sql) %}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

select sbq.*,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }}
{%- if strategy.hard_deletes == 'new_record' -%}
, 'False' as {{ columns.dbt_is_deleted }}
{% endif -%}
from (
{{ sql }}
) sbq
Expand Down
27 changes: 16 additions & 11 deletions dbt/include/teradata/macros/materializations/snapshot/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{% materialization snapshot, adapter='teradata' %}
{%- set config = model['config'] -%}

-- calling the macro set_query_band() which will set the query_band for this materialization as per the user_configuration
{% do set_query_band() %}
Expand Down Expand Up @@ -27,7 +26,9 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{# The model['config'] parameter below is no longer used, but passing anyway for compatibility #}
{# It was a dictionary of config, instead of the config object from the context #}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %}

{% if not target_relation_exists %}

Expand All @@ -42,28 +43,32 @@

{% do adapter.drop_relation(make_temp_relation(target_relation)) %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}

{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@

{% macro teradata__snapshot_merge_sql_update(target, source, insert_cols) -%}
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
UPDATE {{ target }}
FROM (SELECT dbt_scd_id, dbt_change_type, dbt_valid_to FROM {{ source }}) AS DBT_INTERNAL_SOURCE
SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
WHERE DBT_INTERNAL_SOURCE.dbt_scd_id = {{ target }}.dbt_scd_id
FROM (SELECT {{ columns.dbt_scd_id }}, dbt_change_type, {{ columns.dbt_valid_to }} FROM {{ source }}) AS DBT_INTERNAL_SOURCE
SET {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
WHERE DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = {{ target }}.{{ columns.dbt_scd_id }}
AND DBT_INTERNAL_SOURCE.dbt_change_type = 'update'
AND {{ target }}.dbt_valid_to IS NULL
{% if config.get("dbt_valid_to_current") %}
AND ({{ target }}.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or
{{ target }}.{{ columns.dbt_valid_to }} is null)
{% else %}
AND {{ target }}.{{ columns.dbt_valid_to }} is null
{% endif %}
{% endmacro %}

{% macro teradata__snapshot_merge_sql_insert(target, source, insert_cols) -%}
Expand All @@ -20,10 +26,17 @@
{% endmacro %}

{% macro teradata__snapshot_merge_sql_delete(target, source, insert_cols) -%}
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
UPDATE {{ target }}
FROM (SELECT dbt_scd_id, dbt_change_type, dbt_valid_to FROM {{ source }}) AS DBT_INTERNAL_SOURCE
SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
WHERE DBT_INTERNAL_SOURCE.dbt_scd_id = {{ target }}.dbt_scd_id
FROM (SELECT {{ columns.dbt_scd_id }}, dbt_change_type, {{ columns.dbt_valid_to }} FROM {{ source }}) AS DBT_INTERNAL_SOURCE
SET {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
WHERE DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = {{ target }}.{{ columns.dbt_scd_id }}
AND DBT_INTERNAL_SOURCE.dbt_change_type = 'delete'
AND {{ target }}.dbt_valid_to IS NULL
{% if config.get("dbt_valid_to_current") %}
AND ({{ target }}.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or
{{ target }}.{{ columns.dbt_valid_to }} is null)
{% else %}
AND {{ target }}.{{ columns.dbt_valid_to }} is null
{% endif %}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@
{%- endmacro %}

{#-- This macro is copied varbatim from dbt-core. The only delta is that != operator is replaced with <> #}
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}
{% set updated_at = config.get('updated_at', snapshot_get_time()) %}
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %}
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set check_cols_config = config.get('check_cols') %}
{% set primary_key = config.get('unique_key') %}
{% set hard_deletes = adapter.get_hard_deletes_behavior(config) %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set updated_at = config.get('updated_at') or snapshot_get_time() %}

{% set column_added = false %}

Expand All @@ -78,7 +80,8 @@
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
{% set scd_args = api.Relation.scd_args(primary_key, updated_at) %}
{% set scd_id_expr = snapshot_hash_arguments(scd_args) %}
{% set snapshot_hash_udf = config.get('snapshot_hash_udf') %}
{% if snapshot_hash_udf is not none %}
{% set scd_id_expr = scd_id_expr |replace("HASHROW",snapshot_hash_udf) %}
Expand All @@ -90,6 +93,7 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}
Loading

0 comments on commit f6ff463

Please sign in to comment.