×
‹
›
Logo
  • Go to Home
Book a demo

Search

Basics

Advanced

For developers

Connectors

Contact support

Helpdesk portal

Peliqan documentation

Reference

Data apps are interactive apps, built in single-file Python scripts, to run on Peliqan. This page provides a Reference for building Data apps.

Data apps on the Peliqan platform

Peliqan is an all-in-one data platform with a wide range of components, that can be controlled and used from within Data apps using Peliqan’s pq module:

  • Connectors (ETL pipelines that sync data from sources into a datawarehouse)
  • Built-in datawarehouse, and ability to connect an external datawarehouse as ETL target
  • Data explorer & SQL queries for data transformations (queries can be configured as views and/or can be materialized)
  • Trino federated query engine
  • Data replication
  • Python IDE and runtime to develop, test, host, schedule & publish Data apps

Peliqan pq module

The pq module exposes Peliqan’s low-code data-centric and data pipeline functions. The pq module is automatically available in Peliqan when you write Python code in Data apps and can be used in your local IDE by doing pip install peliqan and using import peliqan as pq in your code.

Constants

Run modes

Data apps can run locally on a computer (external to Peliqan) or inside the Peliqan platform.

When hosted on Peliqan, some code is prepended, e.g. to provide context and to automatically import Streamlit as st and Peliqan as pq.

Constant RUN_CONTEXT is always set when running a Data app in Peliqan.

Data apps can run interactively or in the background (e.g. scheduled). Streamlit is used to build UI’s for interactive apps.

Streamlit functions such as st.text(), st.write(), st.json(), st.table() etc. are converted to print statements in background runs.

Print statements are written to the log file of each run, visible in Peliqan.

Data warehouse, databases and tables

Connect to data warehouse or DB

Read

Insert

dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

Update

dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

Upsert

Inserts or updates based on the row_pk_value.

dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

Write

Write implements full ELT pipeline logic: cast to schema (if given), upsert (insert or update based on pk value), and creates table and columns if needed.

Execute query

dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

Federated query engine Trino

Connect Trino

trinoconn = pq.trinoconnect()

Read

rows = trinoconn.fetch('db_or_catalog_name', 'schema_name', 'table_name')

# Read data using custom query
rows = trinoconn.fetch('db_name', query='SELECT * FROM catalog.schema.table_name')

# Get result as a data frame
df = trinoconn.fetch('db_name', 'schema_name', 'table_name', df=True)

Writeback (SaaS APIs)

Writeback functions are wrappers for the APIs of connected SaaS applications.

Some SaaS connections in Peliqan are sources only (ELT pipelines), others are writeback only and some SaaS connections are both.

Writeback examples:

More info:

Writeback to SaaS APIs

SFTP

Connect SFTP

sftpconn = pq.sftpconnect('connection_name')

List files

result = sftpconn.dir('.')

Read file

result = sftpconn.read_file('./file.json')

State

State is stored and maintained across app runs. For session-state (within the scope of a single run), use Streamlit’s session state: st.session_state().

Get state

state = pq.get_state()

if not state:
	state={}

settings = state.get("settings", {})

Set state

new_state = {
	"settings": { ... }
}

pq.set_state(new_state)

Metadata

List databases

databases = pq.list_databases()
st.json(databases)

List schemas

schemas = pq.list_schemas(db_name)
st.json(schemas)

List tables

databases = pq.list_databases()
for db in databases:
	for table in db['tables']:
		st.text(table['name'])

List scripts (data apps)

scripts = pq.list_scripts()
st.json(scripts)

Get database

db_meta_data = pq.get_database(database_id=37734)
st.json(db_meta_data)

Get schema

schema_meta_data = pq.get_schema(schema_id=37734)
st.json(db_meta_data)

Get table

table_meta_data = pq.get_table(table_id=37734)
st.json(table_meta_data)

Get script

script_meta_data = pq.get_script(script_id=745)
st.json(script_meta_data)

Update database

db_meta_data = pq.update_database(database_id=1603, description="This is a database description.")
st.json(db_meta_data)

Update schema

schema_meta_data = pq.update_schema(schema_id=1603, name="my_schema")
st.json(schema_meta_data)

Update table

table_meta_data = pq.update_table(table_id=37734, description="This is a table description.")
st.json(table_meta_data)

Update field

field_meta_data = pq.update_field(field_id=829137, description="This is a field description.")
st.json(field_meta_data)

Update script

script_meta_data = pq.update_script(script_id=745, name="Create and hydrate subaccount")
st.json(script_meta_data)

Add script

pq.add_script(group_id=123, name='my_new_script', raw_script='...')
pq.add_script(group_name='Production data', name='my_new_script', raw_script='...')
pq.add_script(group_name='Production data', name='my_new_script', raw_script='...', run_mode=STREAMLIT)

run_mode supported values are STREAMLIT, API, SHELL.

Add or update a query

pq.add_query(name='My query', query='SELECT * from odoo.accounts', as_view=False)
pq.upsert_query(name='My query', query='SELECT * from odoo.accounts', schema_id=123)

Replicate a table or materialize a query

Pipelines

Run pipeline

Run pipeline for all active tables in the connection:

pq.run_pipeline(connection_name = 'Hubspot', is_async = False)

Run pipeline for one table:

pq.run_pipeline(connection_name = 'Hubspot', tables = "deals")

Run pipeline for a list of tables:

pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals", "contacts"])

Run pipeline with a specific parameter (overrides connection form):

pq.run_pipeline(connection_name = 'Hubspot', tables = ["deals"], param1 = 100)

Example where param1 is the “pagesize” in the connection form.

Get pipeline runs

pq.get_pipeline_runs(source_connection_id)

Get pipeline logs

pq.get_pipeline_logs(pipeline_run_id)

Refresh

A refresh updates the metadata in Peliqan for a connected database, meaning getting an up to date list of e.g. tables in a database.

Refresh database

pq.refresh_database(database_name = 'dw_12345', is_async = False)

Refresh schema

pq.refresh_schema(database_name = 'dw_12345', schema_name = 'CRM', is_async = False)

Refresh table

pq.refresh_table(database_name = 'dw_12345', schema_name = 'CRM', table_name = 'customers', is_async = False)

General

Generic API call to Peliqan backend

Example update field (column) in table:

method = 'post'
url = f"{pq.BACKEND_URL}/api/database/fields/{field_id}/"
kwargs={
    'json': {
        'type': 'link_row',
        'fk_table_id': linked_table_id,
        'name': field_name
    }
}
pq.__service_client__.call_backend(method=method, url=url, expected_status_code=200, **kwargs)

Useful code snippets

Code to run data apps locally (outside of Peliqan) as well as on Peliqan, without any required code changes:

if 'RUN_CONTEXT' not in globals(): # Running outside of Peliqan
    from peliqan import Peliqan
    import streamlit as st
    api_key = os.getenv("PELIQAN_API_KEY")
    if not api_key:
        st.error("PELIQAN_API_KEY environment variable is not set.")
        st.stop()
    pq = Peliqan(api_key)
    RUN_CONTEXT = "interactive"

# Other code goes here

List all databases, schemas, tables in current account:

Get table fields & PK:

Apply Peliqan’s look & feel in Streamlit interactive Data apps:

Logging:

LOG_LEVEL = 100
def print_log(msg: str, level: int = 1) -> None:
    if level < LOG_LEVEL:
        caller_frame = inspect.currentframe().f_back
        line_no = caller_frame.f_lineno
        print(f"{line_no} → {msg}")

API handler Data app example:

pq.DW_NAME       # Name of default data warehouse (database) in the current account, also name of the default data warehouse connection
pq.DW_ID         # Id of default data warehouse in the current account
pq.INTERFACE_ID  # Current script id
RUN_CONTEXT      # Possible values: interactive, scheduled, api_handler or not set (outside of Peliqan)

# Example detect if script running inside or outside of Peliqan:

if 'RUN_CONTEXT' not in globals():
	print("Running outside of Peliqan)
# Connect to e.g. a SQL Server, MySQL, Postgres DB added in Peliqan under "Connections"
dbconn = pq.dbconnect('connection_name')

# Connect to built-in default data warehouse
dbconn = pq.dbconnect(pq.DW_NAME)

# Use the connection to read data
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

# Read data from default built-in data warehouse
rows = dbconn.fetch(pq.DW_NAME, 'schema_name', 'table_name')
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

# Read data using custom query
rows = dbconn.fetch('db_name', query='SELECT * FROM schema_name.table_name')

# Get result as a data frame
df = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)

# Options:
# df = True: set to True to return a Dataframe instead of a list of objects
# fillna_with = '': replace Na (not a number) with a given value, e.g. in empty string
# fillnat_with= '': replace NaT (not a date)
# enable_python_types = True: use Python types in response
# enable_datetime_as_string = True: return datetime columns as string
# tz='UTC': timezone for datetimes
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')

# Write with a schema provided
object_schema = {
	'properties': {
		'id': {'type': 'integer'},
		'name': {'type': 'string'}
	}
}
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], object_schema, pk='id')
apiconn = pq.connect('connection_name')

apiconn.add('object_type', object)   # or apiconn.add('object_type', **object)

apiconn.update('object_type', object)

apiconn.upsert('contact', contact, searchfield='email', searchterm='john@doe.com')

apiconn.delete('object_type', id=1)

apiconn.get('object_type', id=1)

apiconn.findone('object_type', searchfield='email', searchterm='john@doe.com')

apiconn.list('object_type')

apiconn.copy('object_type', id=1)

apiconn.rename('object_type', id=1, name='new name')

apiconn.apicall('path', method='get', body={})
# Send message to a Slack channel
slack_api = pq.connect('Slack')
message = {
    'text': "Data quality alert",
    'channel': "Alerts",
    'username': "The Peliqan bot",
}
result = slack_api.add('message', message)

# Odoo add partner
odoo_api = pq.connect('Odoo')
partner = {
    'name': "ACME",
    'email': "info@acme.com",
}
result = odoo_api.add('partner', partner)
# Example one-time replicate to built-in DWH
dbconn = pq.dbconnect(pq.DW_NAME) # e.g. dw_123 (connection name & DWH name)
result = dbconn.replicate(db_name=pq.DW_NAME, table_source_list=[ (source_table_id, target_table_name), (source_table_id, target_table_name) ], target_schema='my_target_schema')
st.write(result)


# Example one-time replicate to external target
dbconn = pq.dbconnect('MS SQL Server')
result = dbconn.replicate(db_name='my_target_db', table_source_list=[ (source_table_id, target_table_name), (source_table_id, target_table_name) ], target_schema='my_target_schema')
st.write(result)


# Example scheduled replicate to built-in DWH
replicate_settings = {
  "schedule": {
    "interval": 3600,                                    # 3600 (hourly), 21600 (every 6 hours), 86400 (daily)
    "weekdays": [0,1,2,3,4,5,6],                         # 0 = Monday
    "start_time": "03:30:00"                             # Only used when interval = 86400 (24 hour interval)
  }
  "source_incremental_field": "timestamp_last_update",   # optional field, value should be an ISO 8601 timestamp
  "target_table_name":        "replicated_report",       # required field
  "target_schema_name":       "my_replicated_data",      # required field
  "target_database_id":       pq.DW_ID,                  # required field
  "target_connection_name":   pq.DW_NAME,                # required field
}
result = pq.replicate(table_id=source_table_id, enabled=True, settings=replicate_settings)
st.write(result)


# Example scheduled replicate to external target
replicate_settings = {
  "schedule": {
    "interval": 3600,                                   # 3600 (hourly), 21600 (every 6 hours), 86400 (daily)
    "weekdays": [0,1,2,3,4,5,6],                        # 0 = Monday
    "start_time": "03:30:00"                            # Only used when interval = 86400 (24 hour interval)
  }
  "source_incremental_field": "timestamp_last_update",  # optional field, value should be an ISO 8601 timestamp
  "target_table_name":        "replicated_report",      # required field
  "target_schema_name":       "my_replicated_data",     # required field
  "target_database_id":       123,                      # required field
  "target_connection_name":   "SQL Server",             # required field
}
result = pq.replicate(table_id=source_table_id, enabled=True, settings=replicate_settings)
st.write(result)


# Replicate/materialize to a subaccount
result = pq.subaccount_materialize(connection_name = 'Hubspot', subaccount_id = 123, table_source_list = ['contacts'], target_schema = 'materialized_queries')
st.write(result)
def get_tables():
    """Get all tables from all databases in current account"""
    all_tables = []
    for db in pq.list_databases():
        for table in db["tables"]:
            schema_name = next((s["name"] for s in db["schemas"] if s["id"] == table["schema_id"]), None)
            all_tables.append({
                "db_id": db["id"],
                "schema_id": table["schema_id"],
                "table_id": table["id"],
                "db": db["name"],
                "schema": schema_name,
                "table": table["name"]
            })
    return all_tables
def get_table_columns_and_pk(table_id):
    """Get all columns and primary key from a table"""
    table_meta = pq.get_table(table_id)
    columns = table_meta.get("all_fields", [])
    text_columns = [col["name"] for col in columns
                   if col.get("type") in ["text", "varchar", "string"]
                   and not col["name"].startswith("_sdc")]
    pk = next((col["name"] for col in columns if col.get("is_pk")), "id")
    return text_columns, pk
def apply_styling():
    st.markdown("""
	    <link rel="preconnect" href="https://fonts.googleapis.com">
	    <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
	    <link href="https://fonts.googleapis.com/css2?family=Asap:ital,wght@0,100..900;1,100..900&family=Roboto:ital,wght@0,100..900;1,100..900&display=swap" rel="stylesheet">
	    <link href="https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined" rel="stylesheet"/>
	    
	    <style>
	        html, body, h4, [class*="st-"]:not([data-testid="stIconMaterial"]), .main, .stApp {
	            font-family: 'Roboto', sans-serif !important;
	            font-size: 14px !important;
	        }
	        
	        #stDecoration {
	            display: none;
	        }
	        
	        h1, .main-header {
	            font-family: 'Roboto', sans-serif !important;
	            font-size: 28px !important;
	            color: #1f77b4;
	            margin-top: 1rem;
	            margin-bottom: 1rem;
	        }
	                
	        h2, .sub-header {
	            font-family: 'Roboto', sans-serif !important;
	            font-size: 24px !important;
	            color: #1f77b4;
	            margin-top: 1rem;
	            margin-bottom: 1rem;
	        }
	                
	        h3 {
	            font-family: 'Roboto', sans-serif !important;
	            font-size: 20px !important;
	            color: #1f77b4;
	            margin-top: 1rem;
	            margin-bottom: 1rem;
	        }
	                
	        .stMainBlockContainer {
	            padding-top: 1.5rem;
	            padding-bottom: 1rem;
	        }
	        
	        
	    </style>
	    """,
        unsafe_allow_html=True,
    )

	st.set_page_config(
	    page_title="Peliqan Data App",
	    page_icon="",
	    layout="wide",
	    initial_sidebar_state="auto"
	)
def handler(request):
	# Get request details:
	method = request['method']
  url = request['url']
  path = request['path']
  query_string = request['query_string']
  headers = request['headers']
  data = request['data']
  form = request['form']
	
	# Example reading a parameter from the querystring
	from urllib.parse import parse_qs
  query_string_dict = parse_qs(query_string)
  if "param1" in query_string_dict:
	  param1 = query_string_dict['param1'][0]

  # Example reading data from an incoming POST body with JSON
  import json
  postbody_obj = json.loads(data)
  
	# Example response: read rows from a table (pq = Peliqan module)
	dbconn = pq.dbconnect('dw_2')
	table_rows = dbconn.fetch('dw_2', 'salesforce', 'accounts')
	
	# Send the API response (e.g. return the table contents)
	return table_rows