Slow changing dimensions (history tables)

Slow Changing Dimensions Type 2 (SCD T2) is a pattern in data warehouses where you store every version of a record, in order to keep track of historic data. In other words, when a record changes, we insert a new row in a history table. This allows users to go back in time and see what the data looked like on a specific date & time.

Example table Customers:

Id    Name       City
1     ACME Inc   NY
2     Pepsi      WA
3     Cola       BR

Example history table for Customers using SCD T2 where the name of ACME was updated on 15th of Jan from “ACME Ltd” to “ACME Inc”:

Id    Name       City    Timestamp      History id (UUID)
1     ACME Ltd   NY      2024-01-01     e341ac...
1     ACME Inc   NY      2024-01-15     6a52bb...
2     Pepsi      WA      2024-01-01     72bc34...
3     Cola       BR      2024-01-01     d383a2...

We can implement this pattern in Peliqan with a small low-code script. It's the magic of the function dw.write() that makes this implementation so simple. See source code below.


Note that this is only a boilerplate script, more logic needs to be added to make it work at scale:

  • Process source data incrementally: use pq.get_state() and pq.set_state() to keep track of the last timestamp processed from the source (e.g. based on a timestamp column).
  • Schedule this script: make sure the script is executed after each refresh of the source table.
  • Potentially fetch historic rows in batch based on the PK values of the source records that were fetched incrementally, to avoid individual lookups in the history table.

import uuid

dbconn = pq.dbconnect(connection_name)

# Change these settings
connection_name = 'dw_123' # See "Data warehouse" under My Connections
db_name = 'dw_123'
schema_name = 'chargebee'
table_name = 'transactions'
pk_field_name = 'transaction_id'

# No change needed
history_schema_name = 'history'
history_tabl_name = table_name + '_history'
history_pk_field_name = 'history_id'

def get_history_row(row):
    query = (
        "SELECT * FROM " + history_tabl_name + " WHERE " + pk_field_name + " = '" + 
        row[pk_field_name] + "' ORDER BY _sdc_batched_at DESC"
    history_rows = dbconn.fetch(db_name, history_schema_name, query = query)
    if history_rows:
        return history_rows[0]
        return None

def is_different(current_row, history_row):
    for key, val in current_row.items():
        if key[0] != '_':  # exclude metada (columns _sdc_...)
            if key not in history_row:
                return True
            if history_row[key] != val:
                return True
    return False

def write_history(row):
    row[history_pk_field_name] = str(uuid.uuid4())        
    result = dbconn.write(history_schema_name, history_tabl_name, row, pk = history_pk_field_name)
    if result['status'] != 'success':

rows = dbconn.fetch(db_name, schema_name, table_name)
for row in rows:
    history_row = get_history_row(row)
    if not history_row or is_different(row, history_row):