Slow Changing Dimensions Type 2 (SCD T2) is a pattern in data warehouses where you store every version of a record, in order to keep track of historic data. In other words, when a record changes, we insert a new row in a history table. This allows users to go back in time and see what the data looked like on a specific date & time.
Example table Customers:
Id Name City
1 ACME Inc NY
2 Pepsi WA
3 Cola BR
Example history table for Customers using SCD T2 where the name of ACME was updated on 15th of Jan from “ACME Ltd” to “ACME Inc”:
Id Name City Timestamp History id (UUID)
1 ACME Ltd NY 2024-01-01 e341ac...
1 ACME Inc NY 2024-01-15 6a52bb...
2 Pepsi WA 2024-01-01 72bc34...
3 Cola BR 2024-01-01 d383a2...
The “History id” column is the so called “surrogate key”, which is unique for each line in the history table. The PK of the original table (Id) is called the “natural key”, it is unique in the main table but of course not unique in the history table.
We can implement this pattern in Peliqan with a low-code script, see below.
Make sure the script is executed after each refresh of the source table, using e.g. a regular schedule or by including this in a pipeline.
# Change these settings
db_name = pq.DW_NAME
schema_name = 'chargebee'
table_name = 'customers'
pk_field_name = 'customer_id'
timestamp_field_name = '_sdc_batched_at'
# No change needed
history_schema = 'history'
history_table_name = table_name + "_history"
reset_state = False
dw = pq.dbconnect(db_name)
last_processed = pq.get_state()
if not last_processed or reset_state:
last_processed = '1900-01-01 00:00:00.000Z'
st.text("Last processed: " + last_processed)
query_changed_rows = f"SELECT * FROM {schema_name}.{table_name} WHERE {timestamp_field_name} > CAST('{last_processed}' AS TIMESTAMP) ORDER BY {timestamp_field_name} ASC"
df = dw.fetch(db_name, query = query_changed_rows, df=True)
if len(df.index)>0:
#get highest timestamp in result, from last row because query was ordered by timestamp ascending
new_state = df.iloc[-1][timestamp_field_name]
st.text("History rows to process: " + str(len(df.index)))
else:
new_state = None
st.text("No history to process")
#Add timestamp to PK to have a unique id per version in the T2 history table
df['_history_id'] = df[pk_field_name] + '_' + df[timestamp_field_name]
#Remove meta columns from source table
df = df.drop('_sdc_received_at', axis=1)
df = df.drop('_sdc_sequence', axis=1)
df = df.drop('_sdc_table_version', axis=1)
df = df.drop('_sdc_batched_at', axis=1)
#write to history table
history_rows = df.to_dict(orient='records')
batches = [history_rows[i:i+100] for i in range(0, len(history_rows), 100)]
for batch in batches:
result = dw.write(history_schema, history_table_name, batch, pk = '_history_id')
if new_state:
pq.set_state(new_state)