Search

Connectors

Contact support

Helpdesk portal

Reference

Data apps are interactive apps, built in single-file Python scripts, to run on Peliqan. This page provides a Reference for building Data apps.

Data apps on the Peliqan platform

Peliqan is an all-in-one data platform with a wide range of components, that can be controlled and used from within Data apps using Peliqan’s pq module:

  • Connectors (ETL pipelines that sync data from sources into a datawarehouse)
  • Built-in datawarehouse, and ability to connect an external datawarehouse as ETL target
  • Data explorer & SQL queries for data transformations (queries can be configured as views and/or can be materialized)
  • Trino federated query engine
  • Data replication
  • Python IDE and runtime to develop, test, host, schedule & publish Data apps

Peliqan pq module

The pq module exposes Peliqan’s low-code data-centric and data pipeline functions. The pq module is automatically available in Peliqan when you write Python code in Data apps and can be used in your local IDE by doing pip install peliqan and using import peliqan as pq in your code.

Constants

Run modes

Data apps can run locally on a computer (external to Peliqan) or inside the Peliqan platform.

When hosted on Peliqan, some code is prepended, e.g. to provide context and to automatically import Streamlit as st and Peliqan as pq.

Constant RUN_CONTEXT is always set when running a Data app in Peliqan.

Data apps can run interactively or in the background (e.g. scheduled). Streamlit is used to build UI’s for interactive apps.

Streamlit functions such as st.text(), st.write(), st.json(), st.table() etc. are converted to print statements in background runs.

Print statements are written to the log file of each run, visible in Peliqan.

Data warehouse, databases and tables

Connect to data warehouse or DB

Read

Insert

dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

Update

dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

Upsert

dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

Write

Execute query

dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

Federated query engine Trino

Connect Trino

trinoconn = pq.trinoconnect()

Read

rows = trinoconn.fetch('db_or_catalog_name', 'schema_name', 'table_name')

# Read data using custom query
rows = trinoconn.fetch('db_name', query='SELECT * FROM catalog.schema.table_name')

# Get result as a data frame
df = trinoconn.fetch('db_name', 'schema_name', 'table_name', df=True)

Writeback (SaaS APIs)

More info:

Interacting with SaaS APIs

SFTP

Connect SFTP

sftpconn = pq.sftpconnect('connection_name')

List files

result = sftpconn.dir('.')

Read file

result = sftpconn.read_file('./file.json')

State

Get state

state = pq.get_state()

if not state:
	state={}

settings = state.get("settings", {})

Set state

new_state = {
	"settings": { ... }
}

pq.set_state(new_state)

Metadata

List databases

databases = pq.list_databases()
st.json(databases)

List schemas

schemas = pq.list_schemas(db_name)
st.json(schemas)

List tables

databases = pq.list_databases()
for db in databases:
	for table in db['tables']:
		st.text(table['name'])

List scripts (data apps)

scripts = pq.list_scripts()
st.json(scripts)

Get database

db_meta_data = pq.get_database(database_id=37734)
st.json(db_meta_data)

Get schema

schema_meta_data = pq.get_schema(schema_id=37734)
st.json(db_meta_data)

Get table

table_meta_data = pq.get_table(table_id=37734)
st.json(table_meta_data)

Get script

script_meta_data = pq.get_script(script_id=745)
st.json(script_meta_data)

Update database

db_meta_data = pq.update_database(database_id=1603, description="This is a database description.")
st.json(db_meta_data)

Update schema

schema_meta_data = pq.update_schema(schema_id=1603, name="my_schema")
st.json(schema_meta_data)

Update table

table_meta_data = pq.update_table(table_id=37734, description="This is a table description.")
st.json(table_meta_data)

Update field

field_meta_data = pq.update_field(field_id=829137, description="This is a field description.")
st.json(field_meta_data)

Update script

script_meta_data = pq.update_script(script_id=745, name="Create and hydrate subaccount")
st.json(script_meta_data)

Add script

pq.add_script(group_id=123, name='my_new_script', raw_script='...')
pq.add_script(group_name='Production data', name='my_new_script', raw_script='...')
pq.add_script(group_name='Production data', name='my_new_script', raw_script='...', run_mode=STREAMLIT)

run_mode supported values are STREAMLIT, API, SHELL.

Add or update a query

Pipelines

Run pipeline

Run pipeline for all active tables in the connection:

pq.run_pipeline(connection_name = 'Hubspot', is_async = False)

Run pipeline for one table:

pq.run_pipeline(connection_name = 'Hubspot', tables = "deals")

Run pipeline for a list of tables:

pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals", "contacts"])

Run pipeline with a specific parameter (overrides connection form):

pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals"], param1 = 100)

Example where param1 is the “pagesize” in the connection form.

Materialize a table:

pq.materialize([(table_id, 'target_table_name')], 'target_schema_name')

Materialize a table to a sub-account:

pq.subaccount_materialize(connection_name = 'Hubspot', subaccount_id = 123, table_source_list = ['contacts'], target_schema = 'materialized_queries')

Get pipeline runs

pq.get_pipeline_runs(source_connection_id)

Get pipeline logs

pq.get_pipeline_logs(pipeline_run_id)

Refresh

A refresh updates the metadata in Peliqan for a connected database, meaning getting an up to date list of e.g. tables in a database.

Refresh database

pq.refresh_database(database_name = 'dw_12345', is_async = False)

Refresh schema

pq.refresh_schema(database_name = 'dw_12345', schema_name = 'CRM', is_async = False)

Refresh table

pq.refresh_table(database_name = 'dw_12345', schema_name = 'CRM', table_name = 'customers', is_async = False)

General

Generic API call to Peliqan backend

Example update field (column) in table:

method = 'post'
url = f"{pq.BACKEND_URL}/api/database/fields/{field_id}/"
kwargs={
    'json': {
        'type': 'link_row',
        'fk_table_id': linked_table_id,
        'name': field_name
    }
}
pq.__service_client__.call_backend(method=method, url=url, expected_status_code=200, **kwargs)

Useful code snippets

Code to run data apps locally (outside of Peliqan) as well as on Peliqan, without any required code changes:

if 'RUN_CONTEXT' not in globals(): # Running outside of Peliqan
    from peliqan import Peliqan
    import streamlit as st
    api_key = os.getenv("PELIQAN_API_KEY")
    if not api_key:
        st.error("PELIQAN_API_KEY environment variable is not set.")
        st.stop()
    pq = Peliqan(api_key)
    RUN_CONTEXT = "interactive"

# Other code goes here

List all databases, schemas, tables in current account:

Get table fields & PK:

Apply Peliqan’s look & feel in Streamlit interactive Data apps:

Logging:

LOG_LEVEL = 100
def print_log(msg: str, level: int = 1) -> None:
    if level < LOG_LEVEL:
        caller_frame = inspect.currentframe().f_back
        line_no = caller_frame.f_lineno
        print(f"{line_no} → {msg}")

API handler Data app example: