Skip to content

What is incremental_update?

incremental_update is a mechanism that reduces processing time by leveraging the results of the previous Unification process and applying stitching only to updated records.

How does incremental_update improve efficiency?

  1. When incremental_columns: [time] is specified, the Unification Algorithm combines the previous final graph with the graph generated from newly added records. This allows the process to start from a state where most of the stitching is already complete, leading to faster convergence.
  2. When incremental_columns: [time] is specified, the enrichment process applies the canonical_id only to the delta records. This significantly shortens processing time compared to enriching the canonical_id for all records.

Dataset

Using the table from Example as a base, we will insert records during the incremental_update process and observe how the update is applied.

Table Used in Example1

Assume the data from Example1 is stored in the test_id_unification_ex5 database.

datesite_aaasite_aaasite_xxxsite_xxxsite_yyysite_yyysite_zzzsite_zzz
monthdaytd_client_idtd_global_idtd_client_idtd_global_idtd_client_idtd_global_idtd_client_idtd_global_id
15aaa_0013rd_001yyy_0013rd_001
15aaa_0013rd_002zzz_0013rd_002
25aaa_0013rd_003
25aaa_0013rd_004xxx_0013rd_004
15xxx_0013rd_005yyy_0023rd_005
25yyy_0023rd_006zzz_0033rd_006
35zzz_0033rd_007
15xxx_0023rd_008zzz_0033rd_008
25aaa_0023rd_009xxx_0023rd_009
45aaa_0023rd_010yyy_0033rd_010
15yyy_0033rd_011zzz_0043rd_011
25xxx_0033rd_012zzz_0043rd_012
55aaa_0033rd_013xxx_0033rd_013
15aaa_0033rd_014
25aaa_0033rd_015yyy_0043rd_015zzz_0053rd_015
65aaa_0033rd_016xxx_0043rd_016
15xxx_0043rd_017zzz_0053rd_017
25yyy_0053rd_018zzz_0053rd_018

Records to be Added Later

datesite_aaasite_aaasite_xxxsite_xxxsite_yyysite_yyysite_zzzsite_zzz
monthdaytd_client_idtd_global_idtd_client_idtd_global_idtd_client_idtd_global_idtd_client_idtd_global_id
75aaa_0043rd_017xxx_0053rd_018yyy_0063rd_019zzz_0063rd_018
15aaa_0043rd_018zzz_0073rd_018
25
85aaa_0053rd_018xxx_0053rd_018
15xxx_0063rd_019yyy_0063rd_019zzz_0083rd_019
25aaa_0053rd_019yyy_0073rd_019

We will add the above records midway and observe the incremental_update process.

Workflow example

id_unification_ex5.dig

timezone: UTC # Asia/Tokyo
schedule:
  daily>: 09:00:00

+call_unification:
  http_call>: https://api-cdp.treasuredata.com/unifications/workflow_call
  headers:
    - authorization: ${secret:td.apikey}
  method: POST
  retry: true
  content_format: json
  content:

    run_canonical_ids: true
    run_enrichments: true
    run_master_tables: true

    full_refresh: false
    keep_debug_tables: true

    unification:
      !include : unification_ex5.yml

unification_ex5.yml

name: test_id_unification_ex5

keys:
  - name: td_client_id
  - name: td_global_id

tables:
  - database: test_id_unification_ex5
    table: site_aaa
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}
  - database: test_id_unification_ex5
    table: site_xxx
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

  - database: test_id_unification_ex5
    table: site_yyy
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

  - database: test_id_unification_ex5
    table: site_zzz
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

canonical_ids:
  - name: unified_cookie_id
    merge_by_keys: [td_client_id, td_global_id]
    merge_iterations: 5
    incremental_merge_iterations: 3

master_tables:
  - name: master_table_ex5
    canonical_id: unified_cookie_id
    attributes:
    - name: td_client_id
      array_elements: 5
      source_columns:
        - {table: site_aaa, order: last, order_by: time, priority: 1}
        - {table: site_xxx, order: last, order_by: time, priority: 1}
        - {table: site_yyy, order: last, order_by: time, priority: 1}
        - {table: site_zzz, order: last, order_by: time, priority: 1}
    - name: td_global_id
      array_elements: 5
      source_columns:
        - {table: site_aaa, order: last, order_by: time, priority: 1}
        - {table: site_xxx, order: last, order_by: time, priority: 1}
        - {table: site_yyy, order: last, order_by: time, priority: 1}
        - {table: site_zzz, order: last, order_by: time, priority: 1}

Settings for incremental_update

full_refresh: false

  content:

    run_canonical_ids: true
    run_enrichments: true
    run_master_tables: true

    full_refresh: false
    keep_debug_tables: true

    unification:
      !include : unification_ex5.yml

By setting full_refresh: false in the content: section of the dig file, you enable the configuration for incremental_update. However, note that incremental_update is not performed on a daily basis. Instead, a full_refresh (processing all records as usual) is performed once every three days.

incremental_columns:

Warning

Currently, any setting other than incremental_columns: [time] will result in all processes being executed as full_refresh. Avoid using other configurations.

tables:
  - database: test_id_unification_ex5
    table: site_aaa
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

In the tables: section of the YAML file, you can configure incremental_columns: [column1, column2,...] for each table. When the +get_next_high_water_mark task records where processing left off, the order specified in this option will be used for sorting:

ORDER BY column1 DESC, column2 DESC,...

The first row in this sorted order (the most recent record) will have its column1, column2,... values recorded.

If this option is not set, the +get_next_high_water_mark task will not run. The Unification Algorithm will operate in a manner similar to full_refresh, and all records in the enriched_ tables will be replaced.

incremental_columns: [time]

When incremental_columns: [time] is specified (only the time column is defined), the process qualifies as incremental_update. The Unification Algorithm considers only newly added records, and instead of replacing entire enriched_ tables, enrichment is performed on newly added records and appended to the table. As of now, only the [time] setting enables efficient incremental updates, so keep this in mind.

incremental_merge_iterations:

canonical_ids:
  - name: unified_cookie_id
    merge_by_keys: [td_client_id, td_global_id]
    merge_iterations: 5
    incremental_merge_iterations: 3

In the canonical_ids: configuration, the number of iterations for loops during incremental_update can be specified using incremental_merge_iterations:. If not set, the default value is 2.

How to Verify if incremental_update Was Performed

A full_refresh is performed once every three days, while incremental_update is executed on other days. On the second or third day of the schedule execution, check the most recent session (history) of the workflow (WF) using the methods below to identify sessions where incremental_update was processed.

Method 1: Check the +extract_and_merge Task

drop table if exists "unified_cookie_id_graph_unify_loop_0";
create table "unified_cookie_id_graph_unify_loop_0" with (bucketed_on = array['follower_id'], bucket_count = 512) as
-- incremental extraction and merge to the previous graph
select
  coalesce(prev.follower_id, next.follower_id) as follower_id,
  coalesce(prev.follower_ns, next.follower_ns) as follower_ns,

If the comment just above the select statement reads:

-- incremental extraction and merge to the previous graph

then this WF session performed an incremental_update. Conversely, for full_refresh, the comment will read:

-- full extraction

Method 2: Check if the +source_key_stats Task Was Executed

canonical_id_1

In the TIMELINE or TASKS section of the WF session, if the +source_key_stats: task was executed, it indicates a full_refresh. If this task was not executed, it signifies an incremental_update.

On days when incremental_update is executed, you can add new records using the SQL for Appending Records and rerun the WF to confirm the processing of newly added records during the incremental_update.

Explanation of the Unification Algorithm

graph_unify_loop_0

In an incremental_update, the creation of the initial graph differs. The initial graph is formed by combining the following two graphs:

  1. The graph for newly added records
  2. The final graph from the previous execution

1. The Graph for Newly Added Records

canonical_id_1

2. The Final Graph from the Previous Execution

canonical_id_1

graph_unify_loop_0

The initial graph for incremental_update is the result of combining these two graphs.

canonical_id_1

Starting from this graph allows the algorithm to converge with fewer iterations. In most cases, during the first loop, when the leader is replaced, it converges to aaa_001 (see Example 2), bringing it closer to convergence.

graph_unify_loop_1 (and Similar Graphs for 2, etc.)

canonical_id_1

In practice, it is confirmed that the graph converges in the first iteration.

Output

master_table

Since the newly added records also belong to the same individual, the master_table consists of a single record. The canonical_id is generated based on aaa_001, so its value remains unchanged from the previous execution.

Result Example

unified_cookie_idtd_client_idtd_global_id
Su-bHvUu9NN_["yyy_007", "aaa_005", "xxx_006", "yyy_006", "zzz_008"]["3rd_019", "3rd_019", "3rd_019", "3rd_019", "3rd_019"]

enriched_ Table

Under the configuration incremental_columns: [time], when an incremental_update is performed, canonical_id is not assigned to all records in the enriched_ Table with a Replace operation. Instead, canonical_id is assigned only to the updated records, and these are appended to the existing enriched_ Table. This drastically reduces the time required for the enrichment task.

Limitation for incremental_update

When Records from the Past are Deleted or Updated

In incremental_update, any deletions or updates to past records are not taken into the ID unification immediately. This requires careful attention. However, since a full_refresh is performed once every three days, those changes will be addressed during that process.