From d71174b8a4af79883394bcf0ace79b71f015c0fc Mon Sep 17 00:00:00 2001 From: Austin <93135983+austinFlipside@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:27:46 -0400 Subject: [PATCH] hot swap (#85) * hot swap * index * dash --- .../workflows/dbt_run_incremental_temp.yml | 45 -- models/gold/core/core__ez_eth_transfers.sql | 8 +- models/silver/core/silver__traces.sql | 423 +++++++++--------- models/silver/core/silver__traces2.sql | 410 ----------------- 4 files changed, 224 insertions(+), 662 deletions(-) delete mode 100644 .github/workflows/dbt_run_incremental_temp.yml delete mode 100644 models/silver/core/silver__traces2.sql diff --git a/.github/workflows/dbt_run_incremental_temp.yml b/.github/workflows/dbt_run_incremental_temp.yml deleted file mode 100644 index 6b45ccf6..00000000 --- a/.github/workflows/dbt_run_incremental_temp.yml +++ /dev/null @@ -1,45 +0,0 @@ -name: dbt_run_scheduled_temp -run-name: dbt_run_scheduled_temp - -on: - workflow_dispatch: - schedule: - # Runs every 2 hours on the 40th minute - - cron: "50 */2 * * *" - -env: - DBT_PROFILES_DIR: ./ - - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod_backfill - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: install dependencies - run: | - pip install -r requirements.txt - dbt deps - - name: Run DBT Jobs - run: | - dbt run -m models/silver/core/silver__traces2.sql --vars '{"TRACES_BLOCKS":500000}' \ No newline at end of file diff --git a/models/gold/core/core__ez_eth_transfers.sql b/models/gold/core/core__ez_eth_transfers.sql index 50c60658..7ec4acfb 100644 --- a/models/gold/core/core__ez_eth_transfers.sql +++ b/models/gold/core/core__ez_eth_transfers.sql @@ -21,7 +21,9 @@ WITH eth_base AS ( _call_id, _inserted_timestamp, eth_value_precise_raw, - eth_value_precise + eth_value_precise, + tx_position, + trace_index FROM {{ ref('silver__traces') }} WHERE @@ -82,7 +84,9 @@ SELECT 2 ) AS amount_usd, _call_id, - _inserted_timestamp + _inserted_timestamp, + tx_position, + trace_index FROM eth_base A LEFT JOIN {{ ref('silver__hourly_prices_priority_eth') }} diff --git a/models/silver/core/silver__traces.sql b/models/silver/core/silver__traces.sql index 813aca8a..bb35ca73 100644 --- a/models/silver/core/silver__traces.sql +++ b/models/silver/core/silver__traces.sql @@ -5,10 +5,11 @@ unique_key = "block_number", cluster_by = "block_timestamp::date, _inserted_timestamp::date", post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", + full_refresh = false, tags = ['core','non_realtime'] ) }} -WITH traces_txs AS ( +WITH bronze_traces AS ( SELECT block_number, @@ -26,42 +27,73 @@ WHERE FROM {{ this }} ) + AND DATA :result IS NOT NULL {% else %} {{ ref('bronze__streamline_FR_traces') }} WHERE - _partition_by_block_id <= 5000000 + _partition_by_block_id <= 2300000 + AND DATA :result IS NOT NULL {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position ORDER BY _inserted_timestamp DESC)) = 1 ), -base_table AS ( +flatten_traces AS ( SELECT + block_number, + tx_position, + IFF( + path IN ( + 'result', + 'result.value', + 'result.type', + 'result.to', + 'result.input', + 'result.gasUsed', + 'result.gas', + 'result.from', + 'result.output', + 'result.error', + 'result.revertReason', + 'gasUsed', + 'gas', + 'type', + 'to', + 'from', + 'value', + 'input', + 'error', + 'output', + 'revertReason' + ), + 'ORIGIN', + REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') + ) AS trace_address, + _inserted_timestamp, + OBJECT_AGG( + key, + VALUE + ) AS trace_json, CASE + WHEN trace_address = 'ORIGIN' THEN NULL WHEN POSITION( - '.', - path :: STRING - ) > 0 THEN REPLACE( - REPLACE( - path :: STRING, - SUBSTR(path :: STRING, len(path :: STRING) - POSITION('.', REVERSE(path :: STRING)) + 1, POSITION('.', REVERSE(path :: STRING))), - '' - ), - '.', - '__' + '_' IN trace_address + ) = 0 THEN 'ORIGIN' + ELSE REGEXP_REPLACE( + trace_address, + '_[0-9]+$', + '', + 1, + 1 ) - ELSE '__' - END AS id, - OBJECT_AGG( - DISTINCT key, - VALUE - ) AS DATA, - txs.tx_position AS tx_position, - txs.block_number AS block_number, - txs._inserted_timestamp AS _inserted_timestamp + END AS parent_trace_address, + SPLIT( + trace_address, + '_' + ) AS str_array FROM - traces_txs txs, + bronze_traces txs, TABLE( FLATTEN( input => PARSE_JSON( @@ -73,196 +105,179 @@ base_table AS ( WHERE f.index IS NULL AND f.key != 'calls' + AND f.path != 'result' GROUP BY - tx_position, - id, block_number, + tx_position, + trace_address, _inserted_timestamp ), -flattened_traces AS ( +sub_traces AS ( + SELECT + block_number, + tx_position, + parent_trace_address, + COUNT(*) AS sub_traces + FROM + flatten_traces + GROUP BY + block_number, + tx_position, + parent_trace_address +), +num_array AS ( SELECT - DATA :from :: STRING AS from_address, + block_number, + tx_position, + trace_address, + ARRAY_AGG(flat_value) AS num_array + FROM + ( + SELECT + block_number, + tx_position, + trace_address, + IFF( + VALUE :: STRING = 'ORIGIN', + -1, + VALUE :: INT + ) AS flat_value + FROM + flatten_traces, + LATERAL FLATTEN ( + input => str_array + ) + ) + GROUP BY + block_number, + tx_position, + trace_address +), +cleaned_traces AS ( + SELECT + b.block_number, + b.tx_position, + b.trace_address, + IFNULL( + sub_traces, + 0 + ) AS sub_traces, + num_array, + ROW_NUMBER() over ( + PARTITION BY b.block_number, + b.tx_position + ORDER BY + num_array ASC + ) - 1 AS trace_index, + trace_json, + b._inserted_timestamp + FROM + flatten_traces b + LEFT JOIN sub_traces s + ON b.block_number = s.block_number + AND b.tx_position = s.tx_position + AND b.trace_address = s.parent_trace_address + JOIN num_array n + ON b.block_number = n.block_number + AND b.tx_position = n.tx_position + AND b.trace_address = n.trace_address +), +final_traces AS ( + SELECT + tx_position, + trace_index, + block_number, + trace_address, + trace_json :error :: STRING AS error_reason, + trace_json :from :: STRING AS from_address, + trace_json :to :: STRING AS to_address, + IFNULL( + utils.udf_hex_to_int( + trace_json :value :: STRING + ), + '0' + ) AS eth_value_precise_raw, + utils.udf_decimal_adjust( + eth_value_precise_raw, + 18 + ) AS eth_value_precise, + eth_value_precise :: FLOAT AS eth_value, utils.udf_hex_to_int( - DATA :gas :: STRING - ) AS gas, + trace_json :gas :: STRING + ) :: INT AS gas, utils.udf_hex_to_int( - DATA :gasUsed :: STRING - ) AS gas_used, - DATA :input :: STRING AS input, - DATA :output :: STRING AS output, - DATA :error :: STRING AS error_reason, - DATA :to :: STRING AS to_address, - DATA :type :: STRING AS TYPE, - CASE - WHEN DATA :type :: STRING = 'CALL' THEN utils.udf_hex_to_int( - DATA :value :: STRING - ) / pow( - 10, - 18 - ) - ELSE 0 - END AS eth_value, - CASE - WHEN id = '__' THEN CONCAT( - DATA :type :: STRING, - '_ORIGIN' - ) - ELSE CONCAT( - DATA :type :: STRING, - '_', - REPLACE( - REPLACE(REPLACE(REPLACE(id, 'calls', ''), '[', ''), ']', ''), - '__', - '_' - ) - ) - END AS identifier, + trace_json :gasUsed :: STRING + ) :: INT AS gas_used, + trace_json :input :: STRING AS input, + trace_json :output :: STRING AS output, + trace_json :type :: STRING AS TYPE, + concat_ws( + '_', + TYPE, + trace_address + ) AS identifier, concat_ws( '-', block_number, tx_position, identifier ) AS _call_id, - SPLIT( - identifier, - '_' - ) AS id_split, - ARRAY_SLICE(id_split, 1, ARRAY_SIZE(id_split)) AS levels, - ARRAY_TO_STRING( - levels, - '_' - ) AS LEVEL, - CASE - WHEN ARRAY_SIZE(levels) = 1 - AND levels [0] :: STRING = 'ORIGIN' THEN NULL - WHEN ARRAY_SIZE(levels) = 1 THEN 'ORIGIN' - ELSE ARRAY_TO_STRING(ARRAY_SLICE(levels, 0, ARRAY_SIZE(levels) -1), '_')END AS parent_level, - COUNT(parent_level) over ( - PARTITION BY block_number, - tx_position, - parent_level - ) AS sub_traces,* - FROM - base_table - ), - group_sub_traces AS ( - SELECT - tx_position, - block_number, - parent_level, - sub_traces - FROM - flattened_traces - GROUP BY - tx_position, - block_number, - parent_level, - sub_traces - ), - add_sub_traces AS ( - SELECT - flattened_traces.tx_position AS tx_position, - flattened_traces.block_number :: INTEGER AS block_number, - flattened_traces.error_reason AS error_reason, - flattened_traces.from_address AS from_address, - flattened_traces.to_address AS to_address, - flattened_traces.eth_value :: FLOAT AS eth_value, - flattened_traces.gas :: FLOAT AS gas, - flattened_traces.gas_used :: FLOAT AS gas_used, - flattened_traces.input AS input, - flattened_traces.output AS output, - flattened_traces.type AS TYPE, - flattened_traces.identifier AS identifier, - flattened_traces._call_id AS _call_id, - flattened_traces.data AS DATA, - group_sub_traces.sub_traces AS sub_traces, - ROW_NUMBER() over( - PARTITION BY flattened_traces.block_number, - flattened_traces.tx_position - ORDER BY - flattened_traces.gas :: FLOAT DESC, - flattened_traces.eth_value :: FLOAT ASC, - flattened_traces.to_address - ) AS trace_index, - flattened_traces._inserted_timestamp AS _inserted_timestamp - FROM - flattened_traces - LEFT OUTER JOIN group_sub_traces - ON flattened_traces.tx_position = group_sub_traces.tx_position - AND flattened_traces.level = group_sub_traces.parent_level - AND flattened_traces.block_number = group_sub_traces.block_number - ), - final_traces AS ( - SELECT - tx_position, - trace_index, - block_number, - error_reason, - from_address, - to_address, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - _call_id, - _inserted_timestamp, - DATA, - sub_traces - FROM - add_sub_traces - WHERE - identifier IS NOT NULL - ), - new_records AS ( - SELECT - f.block_number, - t.tx_hash, - t.block_timestamp, - t.tx_status, - f.tx_position, - f.trace_index, - f.from_address, - f.to_address, - f.eth_value, - f.gas, - f.gas_used, - f.input, - f.output, - f.type, - f.identifier, - f.sub_traces, - f.error_reason, - CASE - WHEN f.error_reason IS NULL THEN 'SUCCESS' - ELSE 'FAIL' - END AS trace_status, - f.data, - CASE - WHEN t.tx_hash IS NULL - OR t.block_timestamp IS NULL - OR t.tx_status IS NULL THEN TRUE - ELSE FALSE - END AS is_pending, - f._call_id, - f._inserted_timestamp - FROM - final_traces f - LEFT OUTER JOIN {{ ref('silver__transactions') }} - t - ON f.tx_position = t.position - AND f.block_number = t.block_number + _inserted_timestamp, + trace_json AS DATA, + sub_traces + FROM + cleaned_traces +), +new_records AS ( + SELECT + f.block_number, + t.tx_hash, + t.block_timestamp, + t.tx_status, + f.tx_position, + f.trace_index, + f.from_address, + f.to_address, + f.eth_value_precise_raw, + f.eth_value_precise, + f.eth_value, + f.gas, + f.gas_used, + f.input, + f.output, + f.type, + f.identifier, + f.sub_traces, + f.error_reason, + IFF( + f.error_reason IS NULL, + 'SUCCESS', + 'FAIL' + ) AS trace_status, + f.data, + IFF( + t.tx_hash IS NULL + OR t.block_timestamp IS NULL + OR t.tx_status IS NULL, + TRUE, + FALSE + ) AS is_pending, + f._call_id, + f._inserted_timestamp + FROM + final_traces f + LEFT OUTER JOIN {{ ref('silver__transactions') }} + t + ON f.tx_position = t.position + AND f.block_number = t.block_number {% if is_incremental() %} AND t._INSERTED_TIMESTAMP >= ( SELECT - MAX(_inserted_timestamp) :: DATE - 1 + DATEADD('hour', -24, MAX(_inserted_timestamp)) FROM - {{ this }} -) -{% endif %} + {{ this }}) + {% endif %} ) {% if is_incremental() %}, @@ -276,6 +291,8 @@ missing_data AS ( t.trace_index, t.from_address, t.to_address, + t.eth_value_precise_raw, + t.eth_value_precise, t.eth_value, t.gas, t.gas_used, @@ -314,6 +331,8 @@ FINAL AS ( trace_index, from_address, to_address, + eth_value_precise_raw, + eth_value_precise, eth_value, gas, gas_used, @@ -342,6 +361,8 @@ SELECT trace_index, from_address, to_address, + eth_value_precise_raw, + eth_value_precise, eth_value, gas, gas_used, @@ -369,6 +390,7 @@ SELECT trace_index, from_address, to_address, + eth_value_precise, eth_value, gas, gas_used, @@ -383,16 +405,7 @@ SELECT is_pending, _call_id, _inserted_timestamp, - IFNULL( - utils.udf_hex_to_int( - DATA :value :: STRING - ), - '0' - ) AS eth_value_precise_raw, - utils.udf_decimal_adjust( - eth_value_precise_raw, - 18 - ) AS eth_value_precise + eth_value_precise_raw FROM FINAL qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index ORDER BY diff --git a/models/silver/core/silver__traces2.sql b/models/silver/core/silver__traces2.sql deleted file mode 100644 index 6bfb3f16..00000000 --- a/models/silver/core/silver__traces2.sql +++ /dev/null @@ -1,410 +0,0 @@ --- depends_on: {{ ref('bronze__streamline_traces') }} -{{ config ( - materialized = "incremental", - incremental_strategy = 'delete+insert', - unique_key = "block_number", - cluster_by = "block_timestamp::date, _inserted_timestamp::date", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION" -) }} - -WITH bronze_traces AS ( - - SELECT - block_number, - VALUE :array_index :: INT AS tx_position, - DATA :result AS full_traces, - _inserted_timestamp - FROM - -{% if is_incremental() %} -{{ ref('bronze__streamline_traces') }} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} - ) - AND DATA :result IS NOT NULL -{% else %} - {{ ref('bronze__streamline_FR_traces') }} -WHERE - _partition_by_block_id <= 2300000 - AND DATA :result IS NOT NULL -{% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position -ORDER BY - _inserted_timestamp DESC)) = 1 -), -flatten_traces AS ( - SELECT - block_number, - tx_position, - IFF( - path IN ( - 'result', - 'result.value', - 'result.type', - 'result.to', - 'result.input', - 'result.gasUsed', - 'result.gas', - 'result.from', - 'result.output', - 'result.error', - 'result.revertReason', - 'gasUsed', - 'gas', - 'type', - 'to', - 'from', - 'value', - 'input', - 'error', - 'output', - 'revertReason' - ), - 'ORIGIN', - REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') - ) AS trace_address, - _inserted_timestamp, - OBJECT_AGG( - key, - VALUE - ) AS trace_json, - CASE - WHEN trace_address = 'ORIGIN' THEN NULL - WHEN POSITION( - '_' IN trace_address - ) = 0 THEN 'ORIGIN' - ELSE REGEXP_REPLACE( - trace_address, - '_[0-9]+$', - '', - 1, - 1 - ) - END AS parent_trace_address, - SPLIT( - trace_address, - '_' - ) AS str_array - FROM - bronze_traces txs, - TABLE( - FLATTEN( - input => PARSE_JSON( - txs.full_traces - ), - recursive => TRUE - ) - ) f - WHERE - f.index IS NULL - AND f.key != 'calls' - AND f.path != 'result' - GROUP BY - block_number, - tx_position, - trace_address, - _inserted_timestamp -), -sub_traces AS ( - SELECT - block_number, - tx_position, - parent_trace_address, - COUNT(*) AS sub_traces - FROM - flatten_traces - GROUP BY - block_number, - tx_position, - parent_trace_address -), -num_array AS ( - SELECT - block_number, - tx_position, - trace_address, - ARRAY_AGG(flat_value) AS num_array - FROM - ( - SELECT - block_number, - tx_position, - trace_address, - IFF( - VALUE :: STRING = 'ORIGIN', - -1, - VALUE :: INT - ) AS flat_value - FROM - flatten_traces, - LATERAL FLATTEN ( - input => str_array - ) - ) - GROUP BY - block_number, - tx_position, - trace_address -), -cleaned_traces AS ( - SELECT - b.block_number, - b.tx_position, - b.trace_address, - IFNULL( - sub_traces, - 0 - ) AS sub_traces, - num_array, - ROW_NUMBER() over ( - PARTITION BY b.block_number, - b.tx_position - ORDER BY - num_array ASC - ) - 1 AS trace_index, - trace_json, - b._inserted_timestamp - FROM - flatten_traces b - LEFT JOIN sub_traces s - ON b.block_number = s.block_number - AND b.tx_position = s.tx_position - AND b.trace_address = s.parent_trace_address - JOIN num_array n - ON b.block_number = n.block_number - AND b.tx_position = n.tx_position - AND b.trace_address = n.trace_address -), -final_traces AS ( - SELECT - tx_position, - trace_index, - block_number, - trace_address, - trace_json :error :: STRING AS error_reason, - trace_json :from :: STRING AS from_address, - trace_json :to :: STRING AS to_address, - IFNULL( - utils.udf_hex_to_int( - trace_json :value :: STRING - ), - '0' - ) AS eth_value_precise_raw, - utils.udf_decimal_adjust( - eth_value_precise_raw, - 18 - ) AS eth_value_precise, - eth_value_precise :: FLOAT AS eth_value, - utils.udf_hex_to_int( - trace_json :gas :: STRING - ) :: INT AS gas, - utils.udf_hex_to_int( - trace_json :gasUsed :: STRING - ) :: INT AS gas_used, - trace_json :input :: STRING AS input, - trace_json :output :: STRING AS output, - trace_json :type :: STRING AS TYPE, - concat_ws( - '_', - TYPE, - trace_address - ) AS identifier, - concat_ws( - '-', - block_number, - tx_position, - identifier - ) AS _call_id, - _inserted_timestamp, - trace_json AS DATA, - sub_traces - FROM - cleaned_traces -), -new_records AS ( - SELECT - f.block_number, - t.tx_hash, - t.block_timestamp, - t.tx_status, - f.tx_position, - f.trace_index, - f.from_address, - f.to_address, - f.eth_value_precise_raw, - f.eth_value_precise, - f.eth_value, - f.gas, - f.gas_used, - f.input, - f.output, - f.type, - f.identifier, - f.sub_traces, - f.error_reason, - IFF( - f.error_reason IS NULL, - 'SUCCESS', - 'FAIL' - ) AS trace_status, - f.data, - IFF( - t.tx_hash IS NULL - OR t.block_timestamp IS NULL - OR t.tx_status IS NULL, - TRUE, - FALSE - ) AS is_pending, - f._call_id, - f._inserted_timestamp - FROM - final_traces f - LEFT OUTER JOIN {{ ref('silver__transactions') }} - t - ON f.tx_position = t.position - AND f.block_number = t.block_number - -{% if is_incremental() %} -AND t._INSERTED_TIMESTAMP >= ( - SELECT - DATEADD('hour', -24, MAX(_inserted_timestamp)) - FROM - {{ this }}) - {% endif %} -) - -{% if is_incremental() %}, -missing_data AS ( - SELECT - t.block_number, - txs.tx_hash, - txs.block_timestamp, - txs.tx_status, - t.tx_position, - t.trace_index, - t.from_address, - t.to_address, - t.eth_value_precise_raw, - t.eth_value_precise, - t.eth_value, - t.gas, - t.gas_used, - t.input, - t.output, - t.type, - t.identifier, - t.sub_traces, - t.error_reason, - t.trace_status, - t.data, - FALSE AS is_pending, - t._call_id, - GREATEST( - t._inserted_timestamp, - txs._inserted_timestamp - ) AS _inserted_timestamp - FROM - {{ this }} - t - INNER JOIN {{ ref('silver__transactions') }} - txs - ON t.tx_position = txs.position - AND t.block_number = txs.block_number - WHERE - t.is_pending -) -{% endif %}, -FINAL AS ( - SELECT - block_number, - tx_hash, - block_timestamp, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - eth_value_precise_raw, - eth_value_precise, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - trace_status, - DATA, - is_pending, - _call_id, - _inserted_timestamp - FROM - new_records - -{% if is_incremental() %} -UNION -SELECT - block_number, - tx_hash, - block_timestamp, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - eth_value_precise_raw, - eth_value_precise, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - trace_status, - DATA, - is_pending, - _call_id, - _inserted_timestamp -FROM - missing_data -{% endif %} -) -SELECT - block_number, - tx_hash, - block_timestamp, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - eth_value_precise, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - trace_status, - DATA, - is_pending, - _call_id, - _inserted_timestamp, - eth_value_precise_raw -FROM - FINAL qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index -ORDER BY - _inserted_timestamp DESC, is_pending ASC)) = 1