Skip to content

Commit

Permalink
Transfers incremental predicate (#6046)
Browse files Browse the repository at this point in the history
* Update transfer models to use incremental predicate macro

* Update transfer macros to use incremental predicate macro

* More updates

* Remove schema

* Add incremental_predicates to merge intos
  • Loading branch information
aalan3 authored Jun 3, 2024
1 parent c1b5417 commit d108e3d
Show file tree
Hide file tree
Showing 40 changed files with 356 additions and 456 deletions.
2 changes: 0 additions & 2 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,6 @@ models:
+schema: transfers_arbitrum
polygon:
+schema: transfers_polygon
fantom:
+schema: transfers_fantom
base:
+schema: transfers_base
celo:
Expand Down
84 changes: 42 additions & 42 deletions macros/models/_sector/transfers/erc20/transfers_erc20.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,98 +4,98 @@ WITH


erc20_transfers as (
SELECT
'receive' as transfer_type,
SELECT
'receive' as transfer_type,
evt_tx_hash,
evt_index,
evt_index,
evt_block_time,
to as wallet_address,
to as wallet_address,
contract_address as token_address,
CAST(value as double) as amount_raw
FROM
FROM
{{ erc20_evt_transfer }}
{% if is_incremental() %}
WHERE evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('evt_block_time') }}
{% endif %}

UNION ALL
UNION ALL

SELECT
'send' as transfer_type,
SELECT
'send' as transfer_type,
evt_tx_hash,
evt_index,
evt_index,
evt_block_time,
"from" as wallet_address,
"from" as wallet_address,
contract_address as token_address,
-CAST(value as double) as amount_raw
FROM
FROM
{{ erc20_evt_transfer }}
{% if is_incremental() %}
WHERE evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('evt_block_time') }}
{% endif %}
)

{% if wrapped_token_deposit and wrapped_token_withdrawal %}
, wrapped_token_events as (
SELECT
'deposit' as transfer_type,
evt_tx_hash,
evt_index,
SELECT
'deposit' as transfer_type,
evt_tx_hash,
evt_index,
evt_block_time,
dst as wallet_address,
contract_address as token_address,
dst as wallet_address,
contract_address as token_address,
CAST(wad as double)as amount_raw
FROM
FROM
{{ wrapped_token_deposit }}
{% if is_incremental() %}
WHERE evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('evt_block_time') }}
{% endif %}

UNION ALL
UNION ALL

SELECT
'withdraw' as transfer_type,
evt_tx_hash,
evt_index,
SELECT
'withdraw' as transfer_type,
evt_tx_hash,
evt_index,
evt_block_time,
src as wallet_address,
contract_address as token_address,
src as wallet_address,
contract_address as token_address,
-CAST(wad as double)as amount_raw
FROM
FROM
{{ wrapped_token_withdrawal }}
{% if is_incremental() %}
WHERE evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('evt_block_time') }}
{% endif %}
)
{% endif %}

SELECT
'{{blockchain}}' as blockchain,
'{{blockchain}}' as blockchain,
transfer_type,
evt_tx_hash,
evt_tx_hash,
evt_index,
evt_block_time,
CAST(date_trunc('month', evt_block_time) as date) as block_month,
wallet_address,
token_address,
wallet_address,
token_address,
amount_raw
FROM
FROM
erc20_transfers

{% if wrapped_token_deposit and wrapped_token_withdrawal %}
UNION ALL
UNION ALL

SELECT
'{{blockchain}}' as blockchain,
SELECT
'{{blockchain}}' as blockchain,
transfer_type,
evt_tx_hash,
evt_tx_hash,
evt_index,
evt_block_time,
CAST(date_trunc('month', evt_block_time) as date) as block_month,
wallet_address,
token_address,
wallet_address,
token_address,
amount_raw
FROM
FROM
wrapped_token_events
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ SELECT
t.symbol,
sum(tr.amount_raw) as amount_raw,
sum(tr.amount_raw / power(10, t.decimals)) as amount
FROM
FROM
{{ transfers_erc20 }} tr
LEFT JOIN
LEFT JOIN
{{ tokens_erc20 }} t on t.contract_address = tr.token_address
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE tr.evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('tr.evt_block_time') }}
{% endif %}
GROUP BY 1, 2, 3, 4, 5, 6

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ SELECT
t.symbol,
sum(tr.amount_raw) as amount_raw,
sum(tr.amount_raw / power(10, t.decimals)) as amount
FROM
FROM
{{ transfers_erc20 }} tr
LEFT JOIN
LEFT JOIN
{{ tokens_erc20 }} t on t.contract_address = tr.token_address
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE tr.evt_block_time >= date_trunc('hour', now() - interval '3' Day)
WHERE {{ incremental_predicate('tr.evt_block_time') }}
{% endif %}
GROUP BY 1, 2, 3, 4, 5, 6

Expand Down
43 changes: 22 additions & 21 deletions models/transfers/arbitrum/erc20/transfers_arbitrum_erc20.sql
Original file line number Diff line number Diff line change
@@ -1,59 +1,60 @@
{{ config(

materialized = 'incremental',
partition_by = ['block_month'],
file_format = 'delta',
incremental_strategy = 'merge',
unique_key = ['transfer_type', 'evt_tx_hash', 'evt_index', 'wallet_address'],
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.evt_block_time')],
unique_key = ['transfer_type', 'evt_tx_hash', 'evt_index', 'wallet_address'],
alias = 'erc20',
post_hook='{{ expose_spells(\'["arbitrum"]\',
"sector",
"transfers",
\'["Henrystats"]\') }}') }}

WITH
WITH

erc20_transfers as (
SELECT
'receive' as transfer_type,
SELECT
'receive' as transfer_type,
evt_tx_hash,
evt_index,
evt_index,
evt_block_time,
to as wallet_address,
to as wallet_address,
contract_address as token_address,
CAST(value as double) as amount_raw
FROM
FROM
{{ source('erc20_arbitrum', 'evt_transfer') }}
{% if is_incremental() %}
WHERE evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('evt_block_time') }}
{% endif %}

UNION ALL
UNION ALL

SELECT
'send' as transfer_type,
SELECT
'send' as transfer_type,
evt_tx_hash,
evt_index,
evt_index,
evt_block_time,
"from" as wallet_address,
"from" as wallet_address,
contract_address as token_address,
-CAST(value as double) as amount_raw
FROM
FROM
{{ source('erc20_arbitrum', 'evt_transfer') }}
{% if is_incremental() %}
WHERE evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('evt_block_time') }}
{% endif %}
)

SELECT
'arbitrum' as blockchain,
'arbitrum' as blockchain,
transfer_type,
evt_tx_hash,
evt_tx_hash,
evt_index,
evt_block_time,
CAST(date_trunc('month', evt_block_time) as date) as block_month,
wallet_address,
token_address,
wallet_address,
token_address,
amount_raw
FROM
FROM
erc20_transfers
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{ config(

alias = 'erc20_agg_day',
materialized ='incremental',
partition_by = ['block_month'],
file_format ='delta',
incremental_strategy='merge',
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.block_day')],
unique_key = ['block_day', 'wallet_address', 'token_address']
)
}}
Expand All @@ -18,12 +19,12 @@ select
t.symbol,
sum(tr.amount_raw) as amount_raw,
sum(tr.amount_raw / power(10, t.decimals)) as amount
FROM
FROM
{{ ref('transfers_arbitrum_erc20') }} tr
LEFT JOIN
LEFT JOIN
{{ source('tokens_arbitrum', 'erc20') }} t on t.contract_address = tr.token_address
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE tr.evt_block_time >= date_trunc('day', now() - interval '3' Day)
WHERE {{ incremental_predicate('tr.evt_block_time') }}
{% endif %}
GROUP BY 1, 2, 3, 4, 5, 6
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{ config(

alias = 'erc20_agg_hour',
materialized ='incremental',
partition_by = ['block_month'],
file_format ='delta',
incremental_strategy='merge',
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.block_hour')],
unique_key = ['block_hour', 'wallet_address', 'token_address']
)
}}
Expand All @@ -18,12 +19,12 @@ select
t.symbol,
sum(tr.amount_raw) as amount_raw,
sum(tr.amount_raw / power(10, t.decimals)) as amount
FROM
FROM
{{ ref('transfers_arbitrum_erc20') }} tr
LEFT JOIN
LEFT JOIN
{{ source('tokens_arbitrum', 'erc20') }} t on t.contract_address = tr.token_address
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE tr.evt_block_time >= date_trunc('hour', now() - interval '3' Day)
WHERE {{ incremental_predicate('tr.evt_block_time') }}
{% endif %}
GROUP BY 1, 2, 3, 4, 5, 6
11 changes: 6 additions & 5 deletions models/transfers/base/erc20/transfers_base_erc20.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{{ config(
alias = 'erc20',

materialized ='incremental',
file_format ='delta',
incremental_strategy='merge',
incremental_predicates = [incremental_predicate('DBT_INTERNAL_DEST.evt_block_time')],
unique_key='unique_transfer_id',
post_hook='{{ expose_spells(\'["base"]\',
"sector",
Expand All @@ -22,7 +23,7 @@ with
{{ source('erc20_base', 'evt_transfer') }}
where 1=1
{% if is_incremental() %} -- this filter will only be applied on an incremental run
and evt_block_time >= date_trunc('day', now() - interval '7' day)
and {{ incremental_predicate('evt_block_time') }}
{% endif %}
)

Expand All @@ -38,7 +39,7 @@ with
{{ source('erc20_base', 'evt_transfer') }}
where 1=1
{% if is_incremental() %} -- this filter will only be applied on an incremental run
and evt_block_time >= date_trunc('day', now() - interval '7' day)
and {{ incremental_predicate('evt_block_time') }}
{% endif %}

)
Expand All @@ -56,7 +57,7 @@ with
WHERE contract_address = 0x4200000000000000000000000000000000000006
AND topic0 = 0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c --deposit
{% if is_incremental() %} -- this filter will only be applied on an incremental run
and block_time >= date_trunc('day', now() - interval '7' day)
and {{ incremental_predicate('block_time') }}
{% endif %}
)

Expand All @@ -73,7 +74,7 @@ with
WHERE contract_address = 0x4200000000000000000000000000000000000006
AND topic0 = 0x7fcf532c15f0a6db0bd6d0e038bea71d30d808c7d98cb3bf7268a95bf5081b65 --withdrawal
{% if is_incremental() %} -- this filter will only be applied on an incremental run
and block_time >= date_trunc('day', now() - interval '7' day)
and {{ incremental_predicate('block_time') }}
{% endif %}
)

Expand Down
Loading

0 comments on commit d108e3d

Please sign in to comment.