import json
from datetime import datetime, timezone
# --- Utility Functions ---
def fetch_subaccounts(pq):
current_time = datetime.now(timezone.utc)
all_subaccounts = []
page = 1
while True:
result = pq.list_subaccounts(page=page)
if isinstance(result, str):
result = json.loads(result)
all_subaccounts.extend(result.get("data", []))
if not result.get("next"):
break
page += 1
return [
{"id": item["id"], "name": item.get("name", "Unnamed Subaccount"), "owner_id": item.get("owner", {}).get("id")}
for item in all_subaccounts
if not item.get("expires_on") or datetime.fromisoformat(item["expires_on"].replace("Z", "+00:00")) >= current_time
]
def fetch_schemas(instance):
schemas = []
databases = instance.list_databases()
for db in databases:
for schema in db.get("schemas", []):
schemas.append({"id": schema.get("id"), "name": schema.get("name", "Unnamed Schema")})
return schemas, databases
def fetch_tables(databases, schema_id):
tables = []
for db in databases:
for table in db.get("tables", []):
if table.get("schema_id") == schema_id:
tables.append({"id": table.get("id"), "name": table.get("name", "Unnamed Table")})
return tables
def get_primary_key(table_id, databases):
for db in databases:
for table in db.get("tables", []):
if table.get("id") == table_id:
return table.get("primary_field_name")
return None
def load_state(direction_key):
full_state = pq.get_state() or {}
return full_state.get(direction_key, {})
def save_state(direction_key, data):
full_state = pq.get_state() or {}
full_state[direction_key] = data
pq.set_state(full_state)
def sync_sub2parent_from_state(subaccount_id, sub_schema_name, selected_table_names, target_schema_name, target_db_name):
subaccount_instance = pq.get_subaccount_instance(account_id=subaccount_id)
schemas, databases = fetch_schemas(subaccount_instance)
selected_schema = next((s for s in schemas if s["name"] == sub_schema_name), None)
if not selected_schema:
return
tables = fetch_tables(databases, selected_schema["id"])
selected_tables = [t for t in tables if t["name"] in selected_table_names]
pk_map = {t["name"]: get_primary_key(t["id"], databases) for t in selected_tables}
dw_name = f'dw_{subaccount_id}'
sub_account_dbconn = subaccount_instance.dbconnect(dw_name)
LIMIT = 500
for table in selected_tables:
table_name = table["name"]
pk = pk_map[table_name]
offset = 0
all_rows = []
while True:
query = f'SELECT * FROM "{sub_schema_name}"."{table_name}" LIMIT {LIMIT} OFFSET {offset}'
rows = sub_account_dbconn.fetch(dw_name, query=query)
if not rows:
break
all_rows.extend(rows)
offset += LIMIT
if not all_rows:
continue
dbconn = pq.dbconnect(pq.DW_NAME)
dbconn.write(target_schema_name, table_name, records=all_rows, pk=pk)
def materialize_parent2sub_from_state(subaccount_id, subaccount_owner_id, subaccount_schema, source_db_name, source_schema_name, selected_table_names):
parent_databases = pq.list_databases()
selected_source_db = next((db for db in parent_databases if db["server"]["friendly_name"] == source_db_name), None)
source_schemas = selected_source_db.get("schemas", []) if selected_source_db else []
selected_source_schema = next((s for s in source_schemas if s["name"] == source_schema_name), None)
parent_tables = []
if selected_source_schema:
for db in parent_databases:
if db["server"]["friendly_name"] == source_db_name:
for table in db.get("tables", []):
if table.get("schema_id") == selected_source_schema["id"]:
parent_tables.append(table)
selected_parent_tables = [t for t in parent_tables if t["name"] in selected_table_names]
source_tables = [(t["id"], t["name"]) for t in selected_parent_tables]
pq.subaccount_materialize(
connection_name=f'dw_{subaccount_id}',
database_name=f'dw_{subaccount_id}',
subaccount_id=subaccount_owner_id,
table_source_list=source_tables,
target_schema=subaccount_schema
)
if RUN_CONTEXT == "interactive":
# --- Interactive mode ---
sync_direction = st.radio(
"Choose sync direction",
("sub2parent", "parent2sub"),
horizontal=True
)
stored_state = load_state("sub_to_parent" if sync_direction == "sub2parent" else "parent_to_sub")
subaccounts = fetch_subaccounts(pq)
if not subaccounts:
st.warning("No subaccounts found.")
st.stop()
if sync_direction == "sub2parent":
st.title("🔁 Sync Subaccount Table(s) to Parent Account")
col1, col2, col3 = st.columns(3)
with col1:
subaccount_names = [s["name"] for s in subaccounts]
selected_sub_name = st.selectbox("Subaccount", subaccount_names, index=None)
selected_sub = next((s for s in subaccounts if s["name"] == selected_sub_name), None)
schemas, databases = [], []
if selected_sub:
subaccount_instance = pq.get_subaccount_instance(account_id=selected_sub["id"])
schemas, databases = fetch_schemas(subaccount_instance)
with col2:
sub_schema_names = [s["name"] for s in schemas] if schemas else []
selected_sub_schema_name = st.selectbox("Schema (Subaccount)", sub_schema_names, index=None)
selected_schema = next((s for s in schemas if s["name"] == selected_sub_schema_name), None)
with col3:
tables = fetch_tables(databases, selected_schema["id"]) if selected_schema else []
table_names = [t["name"] for t in tables]
selected_table_names = st.multiselect("Table(s) (Subaccount)", table_names, default=None)
st.subheader("🛢 Select Target in Parent Account")
col4, col5 = st.columns(2)
parent_databases = pq.list_databases()
with col4:
target_db_options = [db["server"]["friendly_name"] for db in parent_databases if "server" in db]
selected_target_db_name = st.selectbox("Target Database (Parent)", target_db_options, index=None)
selected_target_db = next((db for db in parent_databases if db["server"]["friendly_name"] == selected_target_db_name), None)
with col5:
target_schema_list = selected_target_db.get("schemas", []) if selected_target_db else []
target_schema_names = [s["name"] for s in target_schema_list]
selected_target_schema_name = st.selectbox("Target Schema (Parent)", target_schema_names, index=None)
if st.button("💾 Save selections for scheduled sync"):
selection_state = {
"subaccount_id": selected_sub["id"] if selected_sub else None,
"subaccount_schema": selected_schema["name"] if selected_schema else None,
"selected_tables": selected_table_names,
"target_db_name": selected_target_db_name,
"target_schema_name": get.selected_target_schema_name,
}
set_state("sub_to_parent", selection_state)
st.success("Selections saved for scheduled Subaccount→Parent sync!")
if st.button("🚀 Sync Table(s) to Parent"):
if not (selected_sub and selected_schema and selected_table_names and selected_target_schema_name and selected_target_db_name):
st.error("Please select all fields.")
else:
sync_sub2parent_from_state(
subaccount_id=selected_sub["id"],
sub_schema_name=selected_schema["name"],
selected_table_names=selected_table_names,
target_schema_name=selected_target_schema_name,
target_db_name=selected_target_db_name
)
else:
st.title("🔁 Sync Parent Account Table(s) to Subaccount")
parent_databases = pq.list_databases()
col1, col2, col3 = st.columns(3)
with col1:
source_db_options = [db["server"]["friendly_name"] for db in parent_databases if "server" in db]
selected_source_db_name = st.selectbox("Source Database (Parent)", source_db_options, index=None)
selected_source_db = next((db for db in parent_databases if db["server"]["friendly_name"] == selected_source_db_name), None)
with col2:
source_schema_list = selected_source_db.get("schemas", []) if selected_source_db else []
source_schema_names = [s["name"] for s in source_schema_list]
selected_source_schema_name = st.selectbox("Source Schema (Parent)", source_schema_names, index=None)
selected_source_schema = next((s for s in source_schema_list if s["name"] == selected_source_schema_name), None)
with col3:
parent_tables = []
if selected_source_schema:
for db in parent_databases:
if db["server"]["friendly_name"] == selected_source_db_name:
for table in db.get("tables", []):
if table.get("schema_id") == selected_source_schema["id"]:
parent_tables.append(table)
parent_table_names = [t["name"] for t in parent_tables]
selected_parent_table_names = st.multiselect("Table(s) to Materialize", parent_table_names, default=None)
st.subheader("📦 Select Target Subaccount & Schema")
col4, col5 = st.columns(2)
with col4:
subaccount_names = [s["name"] for s in subaccounts]
selected_sub_name = st.selectbox("Target Subaccount", subaccount_names, index=None)
selected_sub = next((s for s in subaccounts if s["name"] == selected_sub_name), None)
with col5:
if selected_sub:
subaccount_instance = pq.get_subaccount_instance(account_id=selected_sub["id"])
schemas, databases = fetch_schemas(subaccount_instance)
schema_names = [s["name"] for s in schemas]
selected_sub_schema_name = st.selectbox("Target Schema (Subaccount)", schema_names, index=None)
else:
selected_sub_schema_name = st.selectbox("Target Schema (Subaccount)", [], index=None)
if st.button("💾 Save selections for scheduled materialization"):
selection_state = {
"source_db_name": selected_source_db_name,
"source_schema_name": selected_source_schema_name,
"selected_tables": selected_parent_table_names,
"subaccount_id": selected_sub["id"] if selected_sub else None,
"subaccount_schema": selected_sub_schema_name,
}
set_state("parent_to_sub", selection_state)
st.success("Selections saved for scheduled Parent→Subaccount materialization!")
if st.button("🚀 Sync Table(s) to Subaccount"):
if not (selected_sub and selected_sub_schema_name and selected_parent_table_names):
st.error("Please select all required fields.")
else:
materialize_parent2sub_from_state(
subaccount_id=selected_sub["id"],
subaccount_owner_id=selected_sub["owner_id"],
subaccount_schema=selected_sub_schema_name,
source_db_name=selected_source_db_name,
source_schema_name=selected_source_schema_name,
selected_table_names=selected_parent_table_names
)
elif RUN_CONTEXT == "scheduled":
# --- Subaccount → Parent ---
if load_state("sub_to_parent"):
print(f"Starting Subaccount → Parent sync for {load_state('sub_to_parent').get('subaccount_id')}")
state = load_state("sub_to_parent")
print(state)
sync_sub2parent_from_state(
subaccount_id=state.get("subaccount_id"),
sub_schema_name=state.get("subaccount_schema"),
selected_table_names=state.get("selected_tables"),
target_schema_name=state.get("target_schema_name"),
target_db_name=state.get("target_db_name")
)
print("✅ Subaccount → Parent sync complete.")
# --- Parent → Subaccount ---
if load_state("parent_to_sub"):
print(f"Starting Parent → Subaccount materialization for {load_state('parent_to_sub').get('subaccount_id')}")
state = load_state("parent_to_sub")
# state = pq.get_state() or {}
subaccounts = fetch_subaccounts(pq)
selected_sub = next((s for s in subaccounts if s["id"] == state["subaccount_id"]), None)
if selected_sub:
materialize_parent2sub_from_state(
subaccount_id=state.get("subaccount_id"),
subaccount_owner_id=selected_sub.get("owner_id") if selected_sub else None,
subaccount_schema=state.get("subaccount_schema"),
source_db_name=state.get("source_db_name"),
source_schema_name=state.get("source_schema_name"),
selected_table_names=state.get("selected_tables")
)
print("✅ Parent → Subaccount materialization complete.")
else:
print("❌ Subaccount not found.")