Keeping track of what was processed
When you have a lot of records to sync from a source to a destination, you want to avoid processing all these records on each run of your scheduled script. Instead you only want to process new & updated records.
Incremental processing using a timestamp
You can process records incrementally by filtering on a timestamp “last update”. By storing the highest timestamp that was already processed in the state, you implement a stateful script that will only process new and updated records on each run.
Here’s an example script using a timestamp stored in the state of the script:
previous_timestamp = pq.get_state()
if not previous_timestamp:
previous_timestamp = "1970-01-01 00:00:00"
dbconn = pq.dbconnect(pq.DW_NAME)
my_query = "SELECT * FROM orders WHERE timestamp_last_update > '%s' ORDER BY timestamp_last_update"
% previous_timestamp
source_records = dbconn.fetch(pq.DW_NAME, query = my_query)
for source_record in source_records:
odoo.upsert("order", reference = source_record["code"], ...)
highest_timestamp_processed = source_record["timestamplast_update"])
if len(source_records):
#take timestamp of last source record that was processed
pq.set_state(source_record["timestamplast_update"])
# Note: the timestamp column is usually _sdc_batched_at for target tables,
# _sdc_batched_at is the timestamp the record was added or updated in the DWH
Incremental processing by comparing source & destination
This pattern is more advanced than the pattern using a timestamp because when using a timestamp a record will only be processed once and if writeback fails for that specific record, it will not be picked up again in the future.
An alternative pattern is to compare the data from source & destination using a JOIN query in Peliqan first, and using the result of that JOIN query as input for the sync script to process records.
Example JOIN query to compare data from a source and a destination:
SELECT
source_order.id,
source_order.reference,
source_order.price,
source_order.item_count,
destination_order.id,
destination_order.code,
destination_order.total_price,
destination_order.number_of_items
FROM source_order
LEFT JOIN destination_order
ON source_order.reference = destination_order.code # fields to match on
WHERE
destination_order.id IS NULL OR # include records that do not exist in the destination
(source_order.price <> destination_order.total_price AND
source_order.item_count <> destination_order.number_of_items) # include only source records that are different from the destination
The result of this JOIN will only include records that need to be processed, these can now be picked up by a sync script.
Getting and setting state in a script
A script is stateful when it keeps track of its state across runs.
# Get state from previous run
previous_state = pq.get_state()
# Save state for the next run
pq.set_state(current_state)
The state itself that you set (save) can be a string, number or a dict:
- String: e.g. store the guid of the last processed record (assuming records are sorted) or store the highest timestamp processed
- Number: e.g. store the id of the last processed record (assuming records are sorted)
- Dict: store multiple states, e.g. one for each table
Example of a dict state with a timestamp stored per table:
{
"contacts": "2023-07-25T10:10:48+00:00",
"companies": "2023-07-23T10:11:23+00:00"
}
When you update the state which is a dict, it will merge. Example update:
pq.set_state({"contacts": "2023-07-25T10:10:48+00:00"})
# The new stored state will have the updated key for "contacts"
# and it will still include other keys as well, e.g.:
{
"contacts": "2023-07-25T10:10:48+00:00",
"companies": "2023-07-23T10:11:23+00:00"
}
One-time syncs
This is a special pattern where records are flagged as “processed” in a table so that they are only processed once. This pattern only works if no updates are needed after the one-time processing of a record.
Here’s an example where we added a column to a table called “sync_status”, and we use this column to keep track of rows that are already processed. On each run of the script, rows that were already processed will be skipped:
dbconn = pq.dbconnect(pq.DW_NAME)
rows = dbconn.fetch(pq.DW_NAME, 'shop', 'products')
for row in rows:
if not row["sync_status"]: # skip rows that were already processed
Odoo.update("product", id = row["product_id"], name = row["name"])
# write status
pq.update_cell(
table_name = "my_table",
field_name = "sync_status",
row_id = row["id"], # must be value from primary key column
value = "Processed")