Connectors

Contact support

Writing data to tables

Writing data to tables

Writing to tables

Here are basic examples of writing data to tables:

dbconn = pq.dbconnect(pq.DW_NAME)

# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

# Write a list of records to a table using pipeline logic, which will automatically create the table and columns (details below)
dbconn.write('schema_name', 'table_name', records, pk='id')

Difference between upsert() and write()

Difference between dbconn.upsert() and dbconn.write():

dbconn.upsert()
dbconn.write()
Use case
Insert and update records in operational tables.
Implement a pipeline: write records to a target table.
Avoids duplicates
Yes (based on PK).
Yes (based on PK).
Creates columns
No.
Yes. If record contains new keys, the columns in the table are automatically added.
If PK does not exist
Inserts new row.
Inserts new row.
If PK exists
Updates row: Columns not present in upsert() will remain unchanged.
Replaces row: Columns not present in write() will become empty.
Metadata columns
None
Adds and updates meta data columns: _sdc_batched_at: timestamp update _sdc_sequence etc.
Avoids unneeded updates (*)
Yes
Yes
Creates child tables for nested data
No
Yes

(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).

dbonn.write(): writing to tables using data pipeline logic

The function dbconn.write() uses pipeline logic (Singer) to write rows to a target table in a data warehouse.

This function will create the table if it does not exist, and it will add or alter columns when needed.

dbconn = pq.dbconnect(pq.DW_NAME)

# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')

# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')

# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
    dbconn.write('schema_nema', 'table_name', batch, pk = 'id')

dbconn.write() splitting of columns when data changes

By default the function dbconn.write() performs introspection on the provided data to discover the column types.

If the column types change between two writes, an existing column will be split into 2 columns.

For example an existing numeric column age will be split into age__s and age__i when you send a text value for age.

The __s column will contain string values, the __i column will contain the integer values.

In a similar manner an __s (string) and __t (timestamp) column will be created when writing a string (text) to an existing timestamp column.

Example:

dbconn.write(schema_name, table_name, records=[{'id': 1, 'name': 'John', 'age': 30}], pk='id')
# Result: a new table will be created with an id, name and age column. The age column will be of type 'integer'.

dbconn.write(schema_name, table_name, records=[{'id': 2, 'name': 'Anne', 'age': '20-30'}], pk='id')
# Result: the age column will be split in age__i and age__s.
Script to fix split columns, click here to see source code

dbconn.write() with a schema

The above splitting of columns can be solved by providing an object_schema .

Especially when writing in batches, make sure to always send an oject_schema, otherwise introspection is done per batch (instead of on the entire data set at once) and this might cause the creation of split columns because in one batch all ages are numeric, and in a second batch there’s a text for the age column.

Example:

# Write records to a table with a schema definition
object_schema = {
    "properties": {
        "id": {
            "type": ["integer", "null"]
        },
        "name": {
            "type": ["string", "null"]
        },
        "age": {
            "type": ["integer", "null"]
        }
    }
}

records = [
	{
		'id': 1, 
		'name': 'John', 
		'age': 20
	}
]

dbconn.write('schema_name', 'table_name', records, object_schema=object_schema, pk='id')

# You can also extract the schema from one or more records, update it as needed, and use that object_schema in the dbconn.write() function
records = [{'id': 1, 'name': 'Bob', 'age': None}]
object_schema = pq.discover_object_schema(records)
object_schema["properties"]["age"]["type"] = ["integer", "null"] # Fix the schema for the age column
dbconn.write('schema_name', 'table_name', records, object_schema=object_schema, pk='id')

# Example writing a huge amount of rows in batch with a fixed schema
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
object_schema = pq.discover_object_schema(batches[0])
object_schema["properties"]["age"]["type"] = ["integer", "null"] # Fix the schema for the age column because it was e.g. empty in the first batch
for batch in batches:
    dbconn.write('schema_nema', 'table_name', batch, object_schema=object_schema, pk = 'id')
Example to keep the schema of a target table fixed after initial creation. Click here to expand.

dbconn.write() with a schema and transformer

When providing an object_schema, it is optionally possible to also transform the input data to make it “fit” the provided schema, by using the parameter transformer_mode.

object_schema = {
    "properties": {
        "id": {
            "type": ["integer", "null"]
        },
        "name": {
            "type": ["string", "null"]
        },
        "age": {
            "type": ["integer", "null"]
        }
    }
}

records = [{'id': 13, 'name': 'Vero', 'age': 13.6}]

# Example that will convert age 13.6 to 13 to make it fit the provided object_schema
result = dbconn.write(schema_name, table_name, records = records, pk='id', object_schema = object_schema, transformer_mode = 'lossy')

Options for transformer_mode:

  • strict: This causes the transformer to strictly validate data against the object_schema, without performing any transformations on the original data (e.g. int will not become string). An error will be thrown for non-matching data.
  • lossless (default): This causes the transformer to perform data conversion with a best effort algorithm. In case a data loss is detected, an error is thrown (e.g. int will become string but float will not become int).
  • lossy: This causes the transformer to perform data conversion on all data while silently ignoring data loss (e.g. float will become int).
  • None: transformation step is skipped.