Writing to tables
Here are basic examples of writing data to tables:
dbconn = pq.dbconnect(pq.DW_NAME)
# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)
# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')
# Write a list of records to a table using pipeline logic, which will automatically create the table and columns (details below)
dbconn.write('schema_name', 'table_name', records, pk='id')
Difference between upsert() and write()
Difference between dbconn.upsert()
and dbconn.write()
:
dbconn.upsert() | dbconn.write() | |
Use case | Insert and update records in operational tables. | Implement a pipeline: write records to a target table. |
Avoids duplicates | Yes (based on PK). | Yes (based on PK). |
Creates columns | No. | Yes.
If record contains new keys, the columns in the table are automatically added. |
If PK does not exist | Inserts new row. | Inserts new row. |
If PK exists | Updates row:
Columns not present in upsert() will remain unchanged. | Replaces row:
Columns not present in write() will become empty. |
Metadata columns | None | Adds and updates meta data columns:
_sdc_batched_at: timestamp update
_sdc_sequence etc. |
Avoids unneeded updates (*) | Yes | Yes |
Creates child tables for nested data | No | Yes |
(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate
are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).
dbonn.write(): writing to tables using data pipeline logic
The function dbconn.write()
uses pipeline logic (Singer) to write rows to a target table in a data warehouse.
This function will create the table if it does not exist, and it will add or alter columns when needed.
dbconn = pq.dbconnect(pq.DW_NAME)
# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')
# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')
# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')
# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')
# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
dbconn.write('schema_nema', 'table_name', batch, pk = 'id')
dbconn.write() splitting of columns when data changes
By default the function dbconn.write()
performs introspection on the provided data to discover the column types.
If the column types change between two writes, an existing column will be split into 2 columns.
For example an existing numeric column age
will be split into age__s
and age__i
when you send a text value for age.
The __s
column will contain string values, the __i
column will contain the integer values.
In a similar manner an __s
(string) and __t
(timestamp) column will be created when writing a string (text) to an existing timestamp column.
Example:
dbconn.write(schema_name, table_name, records=[{'id': 1, 'name': 'John', 'age': 30}], pk='id')
# Result: a new table will be created with an id, name and age column. The age column will be of type 'integer'.
dbconn.write(schema_name, table_name, records=[{'id': 2, 'name': 'Anne', 'age': '20-30'}], pk='id')
# Result: the age column will be split in age__i and age__s.
dbconn.write() with a schema
The above splitting of columns can be solved by providing an object_schema
.
Especially when writing in batches, make sure to always send an oject_schema
, otherwise introspection is done per batch (instead of on the entire data set at once) and this might cause the creation of split columns because in one batch all ages are numeric, and in a second batch there’s a text for the age column.
Example:
# Write records to a table with a schema definition
object_schema = {
"properties": {
"id": {
"type": ["integer", "null"]
},
"name": {
"type": ["string", "null"]
},
"age": {
"type": ["integer", "null"]
}
}
}
records = [
{
'id': 1,
'name': 'John',
'age': 20
}
]
dbconn.write('schema_name', 'table_name', records, object_schema=object_schema, pk='id')
# You can also extract the schema from one or more records, update it as needed, and use that object_schema in the dbconn.write() function
records = [{'id': 1, 'name': 'Bob', 'age': None}]
object_schema = pq.discover_object_schema(records)
object_schema["properties"]["age"]["type"] = ["integer", "null"] # Fix the schema for the age column
dbconn.write('schema_name', 'table_name', records, object_schema=object_schema, pk='id')
# Example writing a huge amount of rows in batch with a fixed schema
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
object_schema = pq.discover_object_schema(batches[0])
object_schema["properties"]["age"]["type"] = ["integer", "null"] # Fix the schema for the age column because it was e.g. empty in the first batch
for batch in batches:
dbconn.write('schema_nema', 'table_name', batch, object_schema=object_schema, pk = 'id')
dbconn.write() with a schema and transformer
When providing an object_schema
, it is optionally possible to also transform the input data to make it “fit” the provided schema, by using the parameter transformer_mode
.
object_schema = {
"properties": {
"id": {
"type": ["integer", "null"]
},
"name": {
"type": ["string", "null"]
},
"age": {
"type": ["integer", "null"]
}
}
}
records = [{'id': 13, 'name': 'Vero', 'age': 13.6}]
# Example that will convert age 13.6 to 13 to make it fit the provided object_schema
result = dbconn.write(schema_name, table_name, records = records, pk='id', object_schema = object_schema, transformer_mode = 'lossy')
Options for transformer_mode
:
strict
: This causes the transformer to strictly validate data against the object_schema, without performing any transformations on the original data (e.g. int will not become string). An error will be thrown for non-matching data.lossless
(default): This causes the transformer to perform data conversion with a best effort algorithm. In case a data loss is detected, an error is thrown (e.g. int will become string but float will not become int).lossy
: This causes the transformer to perform data conversion on all data while silently ignoring data loss (e.g. float will become int).None
: transformation step is skipped.