Connectors

Contact support

Slow changing dimensions (history tables)

Slow changing dimensions (history tables)

Slow Changing Dimensions Type 2 (SCD T2) is a pattern in data warehouses where you store every version of a record, in order to keep track of historic data. In other words, when a record changes, we insert a new row in a history table. This allows users to go back in time and see what the data looked like on a specific date & time.

Example table Customers:

Id    Name       City
1     ACME Inc   NY
2     Pepsi      WA
3     Cola       BR

Example history table for Customers using SCD T2 where the name of ACME was updated on 15th of Jan from “ACME Ltd” to “ACME Inc”:

Id    Name       City    Timestamp      History id (UUID)
1     ACME Ltd   NY      2024-01-01     e341ac...
1     ACME Inc   NY      2024-01-15     6a52bb...
2     Pepsi      WA      2024-01-01     72bc34...
3     Cola       BR      2024-01-01     d383a2...

The “History id” column is the so called “surrogate key”, which is unique for each line in the history table. The PK of the original table (Id) is called the “natural key”, it is unique in the main table but of course not unique in the history table.

We can implement this pattern in Peliqan with a low-code script, see below.

Make sure the script is executed after each refresh of the source table, using e.g. a regular schedule or by including this in a pipeline.

# Change these settings
db_name = pq.DW_NAME
schema_name = 'chargebee'
table_name = 'customers'
pk_field_name = 'customer_id'
timestamp_field_name = '_sdc_batched_at'

# No change needed
history_schema = 'history'
history_table_name = table_name + "_history"
reset_state = False

dw = pq.dbconnect(db_name)

last_processed = pq.get_state()
if not last_processed or reset_state:
    last_processed = '1900-01-01 00:00:00.000Z'
st.text("Last processed: " + last_processed)

query_changed_rows = f"SELECT * FROM {schema_name}.{table_name} WHERE {timestamp_field_name} > CAST('{last_processed}' AS TIMESTAMP) ORDER BY {timestamp_field_name} ASC"
df = dw.fetch(db_name, query = query_changed_rows, df=True)

if len(df.index)>0:
    #get highest timestamp in result, from last row because query was ordered by timestamp ascending
    new_state = df.iloc[-1][timestamp_field_name]
    st.text("History rows to process: " + str(len(df.index)))
else:
    new_state = None
    st.text("No history to process")

#Add timestamp to PK to have a unique id per version in the T2 history table
df['_history_id'] = df[pk_field_name] + '_' + df[timestamp_field_name]

#Remove meta columns from source table
df = df.drop('_sdc_received_at', axis=1)
df = df.drop('_sdc_sequence', axis=1)
df = df.drop('_sdc_table_version', axis=1)
df = df.drop('_sdc_batched_at', axis=1)

#write to history table
history_rows = df.to_dict(orient='records')
batches = [history_rows[i:i+100] for i in range(0, len(history_rows), 100)]
for batch in batches:
    result = dw.write(history_schema, history_table_name, batch, pk = '_history_id')
    
if new_state:
    pq.set_state(new_state)