# This script will delete rows in the datawarehouse (DWH) that were deleted in the source Robaws.
# It will fetch a list of all records from the API that were deleted.
# For line items it will fetch all records from the API and compare with the DWH.
# Note that regular ETL pipelines do not delete rows that were deleted in the source.
dbconn = pq.dbconnect(pq.DW_NAME)
robaws_api = pq.connect('Robaws')
def remove_deleted_records(resource):
st.header(resource)
# Fetch deleted records from the API
ids_to_delete = []
page = 0
while True:
#st.write(f"Fetching deleted records for resource {resource}, page {page}.")
result = robaws_api.get('API_call', path = f"{resource}?page={page}&size=100&searchText=DELETED")
#st.json(result)
if not len(result["items"])>0:
break
for item in result["items"]:
ids_to_delete.append(item["id"])
page += 1
if page>100:
st.warning("More than 100 pages in deleled data, that feels incorrect, stopping.")
exit()
# Delete in DWH
table_name = resource.replace("-", "_")
st.write(f"{len(ids_to_delete)} IDs to delete in DWH in table {table_name}:")
ids_to_delete_str = ", ".join(ids_to_delete)
st.write(ids_to_delete_str)
if len(ids_to_delete)==0:
st.write("Nothing to delete in the DWH.")
else:
query = f"DELETE FROM robaws.{table_name} WHERE ID IN ({ids_to_delete_str});"
dbconn.execute(pq.DW_NAME, query = query)
st.write(f"Deleted %s records in the DWH in table {table_name}." % len(ids_to_delete))
def remove_deleted_line_items(parent_resource):
resource = parent_resource + "s"
st.header(resource + " line items")
# Fetch all records from the API
ids_from_api = []
page = 0
while True:
#st.write(f"Fetching records for resource {parent_resource} with line items, page {page}.")
result = robaws_api.get('API_call', path = f"{resource}?include=lineItems&page={page}&size=100")
#st.json(result)
if not len(result["items"])>0:
break
for item in result["items"]:
for line_item in item["lineItems"]:
ids_from_api.append(line_item["id"])
page += 1
if page>1000:
st.warning("More than 1000 pages in data, that feels incorrect, stopping.")
exit()
# Fetch ids from the DWH
table_name = parent_resource.replace("-", "_") + "_line_items"
query = f"SELECT id FROM robaws.{table_name}"
rows = dbconn.fetch(pq.DW_NAME, query=query)
ids_from_dwh = []
for row in rows:
ids_from_dwh.append(row["id"])
# Compare
ids_to_delete = [id for id in ids_from_dwh if id not in ids_from_api]
st.write(f"Comparing {len(ids_from_api)} IDs from the API with {len(ids_from_dwh)} IDs in the DWH.")
# Delete in DWH
st.text(f"{len(ids_to_delete)} IDs to delete in DWH in table {table_name}:")
ids_to_delete_str = ", ".join(ids_to_delete)
st.write(ids_to_delete_str)
max_to_delete = 100
if len(ids_to_delete)==0:
st.write("Nothing to delete in the DWH.")
elif len(ids_to_delete)>max_to_delete:
st.write("Stopping, too many IDs to delete (%s), max_to_delete=%s, perhaps something went wrong." % (len(ids_to_delete), max_to_delete))
else:
query = f"DELETE FROM robaws.{table_name} WHERE ID IN ({ids_to_delete_str});"
dbconn.execute(pq.DW_NAME, query = query)
st.write(f"Deleted %s records in the DWH in table {table_name}." % len(ids_to_delete))
remove_deleted_records("projects")
remove_deleted_records("clients")
remove_deleted_records("sales-invoices")
remove_deleted_line_items("sales-invoice")