Connectors

Contact support

Incremental processing from source

Incremental processing from source

Keeping track of what was processed

When you have a lot of records to sync from a source to a destination, you want to avoid processing all these records on each run of your scheduled script. Instead you only want to process new & updated records.

Incremental processing using a timestamp

You can process records incrementally by filtering on a timestamp “last update”. By storing the highest timestamp that was already processed in the state, you implement a stateful script that will only process new and updated records on each run.

Here’s an example script using a timestamp stored in the state of the script:

previous_timestamp = pq.get_state()

if not previous_timestamp:
  previous_timestamp = "1970-01-01 00:00:00"

dbconn = pq.dbconnect(pq.DW_NAME)
my_query = "SELECT * FROM orders WHERE timestamp_last_update > '%s' ORDER BY timestamp_last_update" 
		% previous_timestamp
source_records = dbconn.fetch(pq.DW_NAME, query = my_query)

for source_record in source_records:
   odoo.upsert("order", reference = source_record["code"], ...)
   highest_timestamp_processed = source_record["timestamplast_update"])

if len(source_records):
  #take timestamp of last source record that was processed
	pq.set_state(source_record["timestamplast_update"])

# Note: the timestamp column is usually _sdc_batched_at for target tables,
# _sdc_batched_at is the timestamp the record was added or updated in the DWH

Incremental processing by comparing source & destination

This pattern is more advanced than the pattern using a timestamp because when using a timestamp a record will only be processed once and if writeback fails for that specific record, it will not be picked up again in the future.

An alternative pattern is to compare the data from source & destination using a JOIN query in Peliqan first, and using the result of that JOIN query as input for the sync script to process records.

Example JOIN query to compare data from a source and a destination:

SELECT
  source_order.id,
  source_order.reference,
  source_order.price,
  source_order.item_count,
  destination_order.id,
  destination_order.code,
  destination_order.total_price,
  destination_order.number_of_items
FROM source_order
LEFT JOIN destination_order
  ON source_order.reference = destination_order.code # fields to match on
WHERE
  destination_order.id IS NULL OR # include records that do not exist in the destination
  (source_order.price <> destination_order.total_price AND
   source_order.item_count <> destination_order.number_of_items) # include only source records that are different from the destination
  

The result of this JOIN will only include records that need to be processed, these can now be picked up by a sync script.

Getting and setting state in a script

A script is stateful when it keeps track of its state across runs.


# Get state from previous run
previous_state = pq.get_state()

# Save state for the next run
pq.set_state(current_state)

The state itself that you set (save) can be a string, number or a dict:

  • String: e.g. store the guid of the last processed record (assuming records are sorted) or store the highest timestamp processed
  • Number: e.g. store the id of the last processed record (assuming records are sorted)
  • Dict: store multiple states, e.g. one for each table

Example of a dict state with a timestamp stored per table:

{
  "contacts": "2023-07-25T10:10:48+00:00",
  "companies": "2023-07-23T10:11:23+00:00"
}

When you update the state which is a dict, it will merge. Example update:

pq.set_state({"contacts": "2023-07-25T10:10:48+00:00"})

# The new stored state will have the updated key for "contacts" 
# and it will still include other keys as well, e.g.:

{
  "contacts": "2023-07-25T10:10:48+00:00",
  "companies": "2023-07-23T10:11:23+00:00"
}

One-time syncs

This is a special pattern where records are flagged as “processed” in a table so that they are only processed once. This pattern only works if no updates are needed after the one-time processing of a record.

Here’s an example where we added a column to a table called “sync_status”, and we use this column to keep track of rows that are already processed. On each run of the script, rows that were already processed will be skipped:

dbconn = pq.dbconnect(pq.DW_NAME)
rows = dbconn.fetch(pq.DW_NAME, 'shop', 'products')

for row in rows:
   if not row["sync_status"]: # skip rows that were already processed
 
     Odoo.update("product", id = row["product_id"], name = row["name"])

      # write status
      pq.update_cell(
				table_name = "my_table", 
		    field_name = "sync_status",
        row_id = row["id"],  # must be value from primary key column
        value = "Processed")