Define the expected schemas in a data contract and test your datasets against defined contracts.
Defining a data contract
Peliqan supports the open source data contract standard defined by Paypal:
https://github.com/paypal/data-contract-template
Here’s a very simple example in YAML format, with many properties omitted:
dataset:
- table: campaigns
columns:
- column: id
logicalType: int
isNullable: false
description: unique id of the marketing campaign
- column: name
logicalType: string
isNullable: true
description: name of the marketing campaign
Testing a dataset for compliance with a data contract
Once you have defined a data contract, you can use a script to test one or more tables for compliance with the given contract.
Here’s a simplified end-to-end example:
import yaml
datacontract="""
dataset:
- table: campaigns_template
columns:
- column: id
logicalType: int
isNullable: false
description: unique id of the marketing campaign
- column: name__1
logicalType: string
isNullable: true
description: name of the marketing campaign
"""
def main():
table_to_test = "campaigns"
datacontract_test_table(datacontract, "campaigns_template", table_to_test)
### Helper functions ###
def datacontract_get_table(datacontract, tablename):
dc = yaml.safe_load(datacontract)
if 'dataset' not in dc:
raise Exception("Key 'dataset' missing in data contract.")
return
for table in dc['dataset']:
if 'table' in table and table['table']==tablename:
return table
def datacontract_to_sql(datacontract, datacontract_tablename, tablename):
table = datacontract_get_table(datacontract, datacontract_tablename)
sql_columns = []
if 'columns' in table:
for column in table['columns']:
if 'column' in column:
columnselect = column['column']
if 'logicalType' in column:
if column['logicalType'] == 'int':
columnselect = "round(%s)" % columnselect # test if value is numeric
elif column['logicalType'] == 'string':
columnselect = "trim(%s)" % columnselect # test if value is string
sql_columns.append(columnselect)
sql_columns_csv = ','.join(sql_columns)
sql = "SELECT %s FROM %s" % (sql_columns_csv, tablename)
return sql
def datacontract_test_table(datacontract, datacontract_tablename, tablename):
sql = datacontract_to_sql(datacontract, datacontract_tablename, tablename)
try:
sample_sql = sql + " LIMIT 1"
df = pq.load_table(query = sample_sql)
except Exception as e:
st.error("Table %s does not comply with the data contract ! Detailed error: %s" % (tablename, e))
exit()
st.success("Table %s complies with the data contract !" % tablename)
main()
When the test fails, you could send out an alert to e.g. a channel in Slack: