Data apps are interactive apps, built in single-file Python scripts, to run on Peliqan. This page provides a Reference for building Data apps.
Data apps on the Peliqan platform
Peliqan is an all-in-one data platform with a wide range of components, that can be controlled and used from within Data apps using Peliqan’s pq module:
- Connectors (ETL pipelines that sync data from sources into a datawarehouse)
- Built-in datawarehouse, and ability to connect an external datawarehouse as ETL target
- Data explorer & SQL queries for data transformations (queries can be configured as views and/or can be materialized)
- Trino federated query engine
- Data replication
- Python IDE and runtime to develop, test, host, schedule & publish Data apps
Peliqan pq module
The pq module exposes Peliqan’s low-code data-centric and data pipeline functions. The pq module is automatically available in Peliqan when you write Python code in Data apps and can be used in your local IDE by doing pip install peliqan and using import peliqan as pq in your code.
Constants
Run modes
Data apps can run locally on a computer (external to Peliqan) or inside the Peliqan platform.
When hosted on Peliqan, some code is prepended, e.g. to provide context and to automatically import Streamlit as st and Peliqan as pq.
Constant RUN_CONTEXT is always set when running a Data app in Peliqan.
Data apps can run interactively or in the background (e.g. scheduled). Streamlit is used to build UI’s for interactive apps.
Streamlit functions such as st.text(), st.write(), st.json(), st.table() etc. are converted to print statements in background runs.
Print statements are written to the log file of each run, visible in Peliqan.
Data warehouse, databases and tables
Connect to data warehouse or DB
Read
Insert
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)Update
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)Upsert
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)Write
Execute query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')Federated query engine Trino
Connect Trino
trinoconn = pq.trinoconnect()Read
rows = trinoconn.fetch('db_or_catalog_name', 'schema_name', 'table_name')
# Read data using custom query
rows = trinoconn.fetch('db_name', query='SELECT * FROM catalog.schema.table_name')
# Get result as a data frame
df = trinoconn.fetch('db_name', 'schema_name', 'table_name', df=True)Writeback (SaaS APIs)
More info:
Interacting with SaaS APIsSFTP
Connect SFTP
sftpconn = pq.sftpconnect('connection_name')List files
result = sftpconn.dir('.')Read file
result = sftpconn.read_file('./file.json')State
Get state
state = pq.get_state()
if not state:
state={}
settings = state.get("settings", {})Set state
new_state = {
"settings": { ... }
}
pq.set_state(new_state)Metadata
List databases
databases = pq.list_databases()
st.json(databases)List schemas
schemas = pq.list_schemas(db_name)
st.json(schemas)List tables
databases = pq.list_databases()
for db in databases:
for table in db['tables']:
st.text(table['name'])List scripts (data apps)
scripts = pq.list_scripts()
st.json(scripts)Get database
db_meta_data = pq.get_database(database_id=37734)
st.json(db_meta_data)Get schema
schema_meta_data = pq.get_schema(schema_id=37734)
st.json(db_meta_data)Get table
table_meta_data = pq.get_table(table_id=37734)
st.json(table_meta_data)Get script
script_meta_data = pq.get_script(script_id=745)
st.json(script_meta_data)Update database
db_meta_data = pq.update_database(database_id=1603, description="This is a database description.")
st.json(db_meta_data)Update schema
schema_meta_data = pq.update_schema(schema_id=1603, name="my_schema")
st.json(schema_meta_data)Update table
table_meta_data = pq.update_table(table_id=37734, description="This is a table description.")
st.json(table_meta_data)Update field
field_meta_data = pq.update_field(field_id=829137, description="This is a field description.")
st.json(field_meta_data)Update script
script_meta_data = pq.update_script(script_id=745, name="Create and hydrate subaccount")
st.json(script_meta_data)Add script
pq.add_script(group_id=123, name='my_new_script', raw_script='...')
pq.add_script(group_name='Production data', name='my_new_script', raw_script='...')
pq.add_script(group_name='Production data', name='my_new_script', raw_script='...', run_mode=STREAMLIT)run_mode supported values are STREAMLIT, API, SHELL.
Add or update a query
Pipelines
Run pipeline
Run pipeline for all active tables in the connection:
pq.run_pipeline(connection_name = 'Hubspot', is_async = False)Run pipeline for one table:
pq.run_pipeline(connection_name = 'Hubspot', tables = "deals")Run pipeline for a list of tables:
pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals", "contacts"])Run pipeline with a specific parameter (overrides connection form):
pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals"], param1 = 100)Example where param1 is the “pagesize” in the connection form.
Materialize a table:
pq.materialize([(table_id, 'target_table_name')], 'target_schema_name')Materialize a table to a sub-account:
pq.subaccount_materialize(connection_name = 'Hubspot', subaccount_id = 123, table_source_list = ['contacts'], target_schema = 'materialized_queries')Get pipeline runs
pq.get_pipeline_runs(source_connection_id)Get pipeline logs
pq.get_pipeline_logs(pipeline_run_id)Refresh
A refresh updates the metadata in Peliqan for a connected database, meaning getting an up to date list of e.g. tables in a database.
Refresh database
pq.refresh_database(database_name = 'dw_12345', is_async = False)Refresh schema
pq.refresh_schema(database_name = 'dw_12345', schema_name = 'CRM', is_async = False)Refresh table
pq.refresh_table(database_name = 'dw_12345', schema_name = 'CRM', table_name = 'customers', is_async = False)General
Generic API call to Peliqan backend
Example update field (column) in table:
method = 'post'
url = f"{pq.BACKEND_URL}/api/database/fields/{field_id}/"
kwargs={
'json': {
'type': 'link_row',
'fk_table_id': linked_table_id,
'name': field_name
}
}
pq.__service_client__.call_backend(method=method, url=url, expected_status_code=200, **kwargs)Useful code snippets
Code to run data apps locally (outside of Peliqan) as well as on Peliqan, without any required code changes:
if 'RUN_CONTEXT' not in globals(): # Running outside of Peliqan
from peliqan import Peliqan
import streamlit as st
api_key = os.getenv("PELIQAN_API_KEY")
if not api_key:
st.error("PELIQAN_API_KEY environment variable is not set.")
st.stop()
pq = Peliqan(api_key)
RUN_CONTEXT = "interactive"
# Other code goes hereList all databases, schemas, tables in current account:
Get table fields & PK:
Apply Peliqan’s look & feel in Streamlit interactive Data apps:
Logging:
LOG_LEVEL = 100
def print_log(msg: str, level: int = 1) -> None:
if level < LOG_LEVEL:
caller_frame = inspect.currentframe().f_back
line_no = caller_frame.f_lineno
print(f"{line_no} → {msg}")API handler Data app example: