You can build data apps using Peliqan’s low-code solution with Python scripts.
Use the module pq
to access built-in Peliqan functions.
Use the module st
(Streamlit) to show output and data (e.g. in a chart or table) and to add UI elements (buttons, drop downs etc.
Loading data
You can load data from any table into your Python code:
# Open a connection to the Peliqan data warehouse
dbconn = pq.dbconnect('dw_123') # See your DW name under My Connections
# Open a connection to a connected database
dbconn = pq.dbconnect('connection_name') # See connection name under My Connections
# Load a table
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')
# Using a custom SQL SELECT query
rows = dbconn.fetch('db_name', query = 'SELECT * FROM schema_name.table_name')
# Get a dataframe as response
rows = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)
# More options for dbconn.fetch():
#
# df = True: set to True to return a Dataframe instead of a list of objects
# fillna_with = '': replace Na (not a number) with a given value, e.g. in empty string
# fillnat_with= '': replace NaT (not a date)
# enable_python_types = True: use Python types in response
# enable_datetime_as_string = True: return datetime columns as string
# tz='UTC': timezone for datetimes
The response rows
is a list of objects, or when df=True
is used the response is a Pandas dataframe.
Working with data
dbconn = pq.dbconnect('dw_123')
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')
# Select the first row
row = rows[0]
# Select one element (cell value) in a row
value = rows[0]["column_name"] # first row
value = rows[1]["column_name"] # second row
# Loop over all rows:
for row in rows:
st.text(row) #print the row
# Loop over all rows and access one element
for row in rows:
st.text(row["column-name"])
# Loop over the column names of one row
row = rows[0]
for column_name in row:
st.text(column_name)
# Loop over all elements of row and get both key & value
row = rows[0]
for key, val in row.items():
st.text('Key %s has value %s' % (key, val))
Working with dataframes
Here are basic examples of working with dataframes:
data = dbconn.fetch('db_name', 'schema_name', 'table_name', df=True)
# Select only a few columns:
data2 = data[["column1", "column2"]]
# Select the first row
row = data.iloc[0]
# Select the first element in a column
value = data["column_name"].iloc[0]
# Loop over all rows:
for i, row in data.iterrows():
st.text(row) #print the row
# Loop over all rows and access each element (also loop over columns):
for i, row in data.iterrows():
for col in row:
st.text(col) # print the element (col)
Writing to tables
Here are basic examples of updating data in tables:
dbconn = pq.dbconnect('dw_123')
# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)
# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')
Examples of using dbconn.write()
which uses pipeline logic to write rows to a target table in a data warehouse. This function will create the table if it does not exist, and it will add or alter columns when needed:
dbconn = pq.dbconnect('dw_123')
# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')
# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')
# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')
# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')
# Write a record to a table with a schema definition
object_schema = {'properties': {'id': {'type': 'integer'}, 'name': {'type': 'string'}}}
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], object_schema, pk='id')
# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
dbconn.write('schema_nema', 'table_name', batch, pk = 'id')
Difference between dbconn.upsert()
and dbconn.write()
:
dbconn.upsert() | dbconn.write() | |
Use case | Insert and update records in operational tables. | Implement pipeline: write records to a target table. |
Avoids duplicates | Yes (based on PK). | Yes (based on PK). |
Creates columns | No. | Yes.
If record contains new keys, the columns in the table are automatically added. |
If PK does not exist | Inserts new row. | Inserts new row. |
If PK exists | Updates row:
Columns not present in upsert() will remain unchanged in the table. | Replaces row:
Columns not present in write() will become empty. |
Metadata columns | None | Updates meta data columns:
_sdc_batched_at: timestamp update
_sdc_sequence etc. |
No unneeded updates (*) | Yes | Yes |
(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate
are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).
Building a UI
Peliqan uses Streamlit as the front-end for your Python scripts. Examples:
st.title("My title")
st.text("My text")
st.line_chart(data)
dbconn = pq.dbconnect('dw_123')
rows = dbconn.fetch('db_name', 'schema_name', 'table_name')
st.table(rows)
st.json(rows)
st.dataframe(rows)