Incremental processing

Keeping track of what was processed

When you have a lot of records to sync from a source to a destination, you want to avoid processing all these records on each run of your scheduled script. Instead you only want to process new & updated records.

Incremental processing using a timestamp

You can process records incrementally by filtering on a timestamp “last update”. By storing the highest timestamp that was already processed, you implement a stateful script that will only process new and updated records on each run.

Here’s an example script using a timestamp stored in the state of the script:

previous_timestamp = pq.get_state()

if previous_timestamp=="":
  previous_timestamp = "1970-01-01 00:00:00"

dbconn = pq.dbconnect('dw_123')
my_query = "SELECT * FROM orders WHERE timestamp_last_update > %s ORDER BY timestamp_last_update" 
		% previous_timestamp
source_records_to_process = dbconn.fetch('dw_123', query = my_query)

for source_record in source_records_to_process:
   odoo.upsert("order", reference = source_record["code"], ...)
   highest_timestamp_processed = source_record["timestamplast_update"])


Incremental processing by comparing source & destination

This pattern is more advanced than the pattern using a timestamp because when using a timestamp a record will only be processed once and if writeback fails for that specific record, it will not be picked up again in the future.

An alternative pattern is to compare the data from source & destination using a JOIN query in Peliqan first, and using the result of that JOIN query as input for the sync script to process records.

Example JOIN query to compare data from a source and a destination:

FROM source_order
LEFT JOIN destination_order
  ON source_order.reference = destination_order.code # fields to match on
WHERE IS NULL OR # include records that do not exist in the destination
  (source_order.price <> destination_order.total_price AND
   source_order.item_count <> destination_order.number_of_items) # include only source records that are different from the destination

The result of this JOIN will only include records that need to be processed, these can now be picked up by a sync script.

Getting and setting state in a script

A script is stateful when it keeps track of its state across runs.

# Get state from previous run
previous_state = pq.get_state()

# Save state for the next run

The state itself that you set (save) can be a string, number or a dict:

  • String: e.g. store the guid of the last processed record (assuming records are sorted) or store the highest timestamp processed
  • Number: e.g. store the id of the last processed record (assuming records are sorted)
  • Dict: store multiple states, e.g. one for each table

Example of a dict state with a timestamp stored per table:

  "contacts": "2023-07-25T10:10:48+00:00",
  "companies": "2023-07-23T10:11:23+00:00"

When you update the state which is a dict, it will merge. Example update:

pq.set_state({"contacts": "2023-07-25T10:10:48+00:00"})

# The new stored state will have the updated key for "contacts" 
# and it will still include other keys as well, e.g.:

  "contacts": "2023-07-25T10:10:48+00:00",
  "companies": "2023-07-23T10:11:23+00:00"

One-time syncs

This is a special pattern where records are flagged as “processed” in a table so that they are only processed once. This pattern only works if no updates are needed after the one-time processing of a record.

Here’s an example where we added a column to a table called “sync_status”, and we use this column to keep track of rows that are already processed. On each run of the script, rows that were already processed will be skipped:

dbconn = pq.dbconnect('dw_123')
rows = dbconn.fetch('dw_123', 'shop', 'products')

for row in rows:
   if not row["sync_status"]: # skip rows that were already processed
     Odoo.update("product", id = row["product_id"], name = row["name"])

      # write status
				table_name = "my_table", 
		    field_name = "sync_status",
        row_id = row["id"],  # must be value from primary key column
        value = "Processed")