Basics of data apps

You can build data apps using Peliqan’s low-code solution with Python scripts.

Use the module pq to access built-in Peliqan functions.

Use the module st (Streamlit) to show output and data (e.g. in a chart or table) and to add UI elements (buttons, drop downs etc.

Loading data

You can load data from any table into your Python code:

# Open a connection to the Peliqan data warehouse
dbconn = pq.dbconnect('dw_123') # See your DW name under My Connections

# Open a connection to a connected database
dbconn = pq.dbconnect('connection_name') # See connection name under My Connections

# Load a table
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

# Using a custom SQL SELECT query
rows = dbconn.fetch('db_name', query = 'SELECT * FROM schema_name.table_name')

# Get a dataframe as response
rows = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)

# More options for dbconn.fetch():
#
# df = True: set to True to return a Dataframe instead of a list of objects
# fillna_with = '': replace Na (not a number) with a given value, e.g. in empty string
# fillnat_with= '': replace NaT (not a date)
# enable_python_types = True: use Python types in response
# enable_datetime_as_string = True: return datetime columns as string
# tz='UTC': timezone for datetimes

The response rows is a list of objects, or when df=True is used the response is a Pandas dataframe.

Working with data

dbconn = pq.dbconnect('dw_123') 
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

# Select the first row
row = rows[0]

# Select one element (cell value) in a row
value = rows[0]["column_name"]  # first row
value = rows[1]["column_name"]  # second row

# Loop over all rows:
for row in rows:
    st.text(row) #print the row

# Loop over all rows and access one element
for row in rows:
		st.text(row["column-name"])

# Loop over the column names of one row
row = rows[0]
for column_name in row:
  st.text(column_name)

# Loop over all elements of row and get both key & value  
row = rows[0]
for key, val in row.items():
  st.text('Key %s has value %s' % (key, val))

Working with dataframes

Here are basic examples of working with dataframes:

data = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)

# Select only a few columns:
data2 = data[["column1", "column2"]]

# Select the first row
row = data.iloc[0]

# Select the first element in a column
value = data["column_name"].iloc[0]

# Loop over all rows:
for i, row in data.iterrows():
    st.text(row) #print the row

# Loop over all rows and access each element (also loop over columns):
for i, row in data.iterrows():
    for col in row:
      st.text(col) # print the element (col)

Writing to tables

Here are basic examples of updating data in tables:

dbconn = pq.dbconnect('dw_123')

# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

Examples of using dbconn.write() which uses pipeline logic to write rows to a target table in a data warehouse. This function will create the table if it does not exist, and it will add or alter columns when needed:

dbconn = pq.dbconnect('dw_123')

# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')

# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')

# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write a record to a table with a schema definition
object_schema = {'properties': {'id': {'type': 'integer'}, 'name': {'type': 'string'}}}
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], object_schema, pk='id')

# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
    dbconn.write('schema_nema', 'table_name', batch, pk = 'id')

Difference between dbconn.upsert() and dbconn.write():

dbconn.upsert()
dbconn.write()
Use case
Insert and update records in operational tables.
Implement pipeline: write records to a target table.
Avoids duplicates
Yes (based on PK).
Yes (based on PK).
Creates columns
No.
Yes. If record contains new keys, the columns in the table are automatically added.
If PK does not exist
Inserts new row.
Inserts new row.
If PK exists
Updates row: Columns not present in upsert() will remain unchanged in the table.
Replaces row: Columns not present in write() will become empty.
Metadata columns
None
Updates meta data columns: _sdc_batched_at: timestamp update _sdc_sequence etc.
No unneeded updates (*)
Yes
Yes

(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).

Building a UI

Peliqan uses Streamlit as the front-end for your Python scripts. Examples:

st.title("My title")
st.text("My text")
st.line_chart(data)

dbconn = pq.dbconnect('dw_123') 
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')

st.table(rows)
st.json(rows)
st.dataframe(rows)