The pq
module exposes Peliqan’s low-code data-centric and data pipeline functions. The pq
module is automatically available in Peliqan when you write Python code in Data apps and can be used in your local IDE by doing pip install peliqan
and using import peliqan as pq
in your code.
Data warehouse, databases and tables
Connect to data warehouse or DB
dbconn = pq.dbconnect('dw_2')
# Use this connection to read data
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')
Read
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')
# Read data using custom query
rows = dbconn.fetch('db_name', query='SELECT * FROM schema_name.table_name')
# Get result as a data frame
df = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)
# Options:
# df = True: set to True to return a Dataframe instead of a list of objects
# fillna_with = '': replace Na (not a number) with a given value, e.g. in empty string
# fillnat_with= '': replace NaT (not a date)
# enable_python_types = True: use Python types in response
# enable_datetime_as_string = True: return datetime columns as string
# tz='UTC': timezone for datetimes
Insert
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)
Update
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
Upsert
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
Write
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')
# Write with a schema provided
object_schema = {
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string'}
}
}
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], object_schema, pk='id')
Execute query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')
Federated query engine Trino
Connect Trino
trinoconn = pq.trinoconnect()
Read
rows = trinoconn.fetch('db_or_catalog_name', 'schema_name', 'table_name')
# Read data using custom query
rows = trinoconn.fetch('db_name', query='SELECT * FROM catalog.schema.table_name')
# Get result as a data frame
df = trinoconn.fetch('db_name', 'schema_name', 'table_name', df=True)
Writeback (SaaS APIs)
apiconn = pq.connect('connection_name')
apiconn.add('object_type', object)
apiconn.update('object_type', object)
apiconn.upsert('contact', contact, searchfield='email', searchterm='john@doe.com')
apiconn.delete('object_type', id=1)
apiconn.get('object_type', id=1)
apiconn.findone('object_type', searchfield='email', searchterm='john@doe.com')
apiconn.list('object_type')
apiconn.copy('object_type', id=1)
apiconn.rename('object_type', id=1, name='new name')
apiconn.apicall('path', method='get', body={})
More info:
Interacting with SaaS APIsSFTP
Connect SFTP
sftpconn = pq.sftpconnect('connection_name')
List files
result = sftpconn.dir('.')
Read file
result = sftpconn.read_file('./file.json')
State
Get state
pq.get_state()
Set state
pq.set_state(new_state)
Metadata
List databases
databases = pq.list_databases()
st.json(databases)
List schemas
schemas = pq.list_schemas(db_name)
st.json(schemas)
List tables
databases = pq.list_databases()
for db in databases:
for table in db['tables']:
st.text(table['name'])
List scripts (data apps)
scripts = pq.list_scripts()
st.json(scripts)
Get database
db_meta_data = pq.get_database(database_id=37734)
st.json(db_meta_data)
Get schema
schema_meta_data = pq.get_schema(schema_id=37734)
st.json(db_meta_data)
Get table
table_meta_data = pq.get_table(table_id=37734)
st.json(table_meta_data)
Get script
script_meta_data = pq.get_script(script_id=745)
st.json(script_meta_data)
Update database
db_meta_data = pq.update_database(database_id=1603, description="This is a database description.")
st.json(db_meta_data)
Update schema
schema_meta_data = pq.update_schema(schema_id=1603, name="my_schema")
st.json(schema_meta_data)
Update table
table_meta_data = pq.update_table(table_id=37734, description="This is a table description.")
st.json(table_meta_data)
Update field
field_meta_data = pq.update_field(field_id=829137, description="This is a field description.")
st.json(field_meta_data)
Update script
script_meta_data = pq.update_script(script_id=745, name="Create and hydrate subaccount")
st.json(script_meta_data)
Pipelines
Run pipeline
Run pipeline for all active tables in the connection:
pq.run_pipeline(connection_name = 'Hubspot', is_async = False)
Run pipeline for one table:
pq.run_pipeline(connection_name = 'Hubspot', tables = "deals")
Run pipeline for a list of tables:
pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals", "contacts"])
Get pipeline runs
pq.get_pipeline_runs(source_connection_id)
Get pipeline logs
pq.get_pipeline_logs(pipeline_run_id)
Refresh
A refresh updates the metadata in Peliqan for a connected database, meaning getting an up to date list of e.g. tables in a database.
Refresh database
pq.refresh_database(database_name = 'dw_12345', is_async = False)
Refresh schema
pq.refresh_schema(database_name = 'dw_12345', schema_name = 'CRM', is_async = False)
Refresh table
pq.refresh_table(database_name = 'dw_12345', schema_name = 'CRM', table_name = 'customers', is_async = False)