Connectors

Contact support

Reference

The pq module exposes Peliqan’s low-code data-centric and data pipeline functions. The pq module is automatically available in Peliqan when you write Python code in Data apps and can be used in your local IDE by doing pip install peliqan and using import peliqan as pq in your code.

Data warehouse, databases and tables

Connect to data warehouse or DB

dbconn = pq.dbconnect('dw_2')

# Use this connection to read data
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

Read

rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

# Read data using custom query
rows = dbconn.fetch('db_name', query='SELECT * FROM schema_name.table_name')

# Get result as a data frame
df = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)

# Options:
# df = True: set to True to return a Dataframe instead of a list of objects
# fillna_with = '': replace Na (not a number) with a given value, e.g. in empty string
# fillnat_with= '': replace NaT (not a date)
# enable_python_types = True: use Python types in response
# enable_datetime_as_string = True: return datetime columns as string
# tz='UTC': timezone for datetimes

Insert

dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

Update

dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

Upsert

dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

Write

dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')

# Write with a schema provided
object_schema = {
	'properties': {
		'id': {'type': 'integer'},
		'name': {'type': 'string'}
	}
}
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], object_schema, pk='id')

Execute query

dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

Federated query engine Trino

Connect Trino

trinoconn = pq.trinoconnect()

Read

rows = trinoconn.fetch('db_or_catalog_name', 'schema_name', 'table_name')

# Read data using custom query
rows = trinoconn.fetch('db_name', query='SELECT * FROM catalog.schema.table_name')

# Get result as a data frame
df = trinoconn.fetch('db_name', 'schema_name', 'table_name', df=True)

Writeback (SaaS APIs)

apiconn = pq.connect('connection_name')

apiconn.add('object_type', object)

apiconn.update('object_type', object)

apiconn.upsert('contact', contact, searchfield='email', searchterm='john@doe.com')

apiconn.delete('object_type', id=1)

apiconn.get('object_type', id=1)

apiconn.findone('object_type', searchfield='email', searchterm='john@doe.com')

apiconn.list('object_type')

apiconn.copy('object_type', id=1)

apiconn.rename('object_type', id=1, name='new name')

apiconn.apicall('path', method='get', body={})

More info:

Interacting with SaaS APIsInteracting with SaaS APIs

SFTP

Connect SFTP

sftpconn = pq.sftpconnect('connection_name')

List files

result = sftpconn.dir('.')

Read file

result = sftpconn.read_file('./file.json')

State

Get state

pq.get_state()

Set state

pq.set_state(new_state)

Metadata

List databases

databases = pq.list_databases()
st.json(databases)

List schemas

schemas = pq.list_schemas(db_name)
st.json(schemas)

List tables

databases = pq.list_databases()
for db in databases:
	for table in db['tables']:
		st.text(table['name'])

List scripts (data apps)

scripts = pq.list_scripts()
st.json(scripts)

Get database

db_meta_data = pq.get_database(database_id=37734)
st.json(db_meta_data)

Get schema

schema_meta_data = pq.get_schema(schema_id=37734)
st.json(db_meta_data)

Get table

table_meta_data = pq.get_table(table_id=37734)
st.json(table_meta_data)

Get script

script_meta_data = pq.get_script(script_id=745)
st.json(script_meta_data)

Update database

db_meta_data = pq.update_database(database_id=1603, description="This is a database description.")
st.json(db_meta_data)

Update schema

schema_meta_data = pq.update_schema(schema_id=1603, name="my_schema")
st.json(schema_meta_data)

Update table

table_meta_data = pq.update_table(table_id=37734, description="This is a table description.")
st.json(table_meta_data)

Update field

field_meta_data = pq.update_field(field_id=829137, description="This is a field description.")
st.json(field_meta_data)

Update script

script_meta_data = pq.update_script(script_id=745, name="Create and hydrate subaccount")
st.json(script_meta_data)

Pipelines

Run pipeline

Run pipeline for all active tables in the connection:

pq.run_pipeline(connection_name = 'Hubspot', is_async = False)

Run pipeline for one table:

pq.run_pipeline(connection_name = 'Hubspot', tables = "deals")

Run pipeline for a list of tables:

pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals", "contacts"])

Get pipeline runs

pq.get_pipeline_runs(source_connection_id)

Get pipeline logs

pq.get_pipeline_logs(pipeline_run_id)

Refresh

A refresh updates the metadata in Peliqan for a connected database, meaning getting an up to date list of e.g. tables in a database.

Refresh database

pq.refresh_database(database_name = 'dw_12345', is_async = False)

Refresh schema

pq.refresh_schema(database_name = 'dw_12345', schema_name = 'CRM', is_async = False)

Refresh table

pq.refresh_table(database_name = 'dw_12345', schema_name = 'CRM', table_name = 'customers', is_async = False)