import sqlanydb
def to_dict_list(cursor, rows):
list_of_dicts = []
for row in rows:
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
list_of_dicts.append(d)
return list_of_dicts
def execute_query(query):
cursor.execute(query)
result = cursor.fetchall()
result_dict = to_dict_list(cursor, result)
return result_dict
def get_tables():
query = """
select t.table_id, t.table_name, t.count, c.column_name as pk_name
from systable t left join syscolumn c on t.table_id = c.table_id where
t.creator<>0 and t.table_type='BASE' and c.pkey='Y' and t.count>0
"""
tables = execute_query(query)
st.table(tables)
def get_columns(table_id):
query = f"select * from syscolumn where table_id = %s" % table_id # see pkey for primary key(s)
columns_with_pk = execute_query(query)
st.table(columns_with_pk)
query = f"select * from systabcol where table_id = %s" % table_id # see base_type_str for column type
columns_with_types = execute_query(query)
st.table(columns_with_types)
def get_data_batch(table, batch_number, bookmark):
batch_size = 2000
batch_start = (batch_number * batch_size) + 1 # batch_number starts at 0
# for full sync (not incremental), in case there is no mod_date timestamp column
# use pk in "order by"
#pk = table["pk"]
#if isinstance(pk, list):
# pk = ','.join(table["pk"])
#else:
# pk = table["pk"]
query = f"select top %s start at %s * from dba.%s where mod_date>='%s' order by mod_date asc" % (batch_size, batch_start, table["name"], bookmark)
rows = execute_query(query)
return rows
def show_data(table_name):
query = f"select top 10 * from dba.%s" % table_name
rows = execute_query(query)
st.table(rows)
def write_data(table, rows, schema):
pk = table["pk"]
if isinstance(pk, list):
if len(pk)>2:
st.error("PK with more than 2 fields, adapt code for id field accordingly.")
st.exit()
for i, row in enumerate(rows):
rows[i]["id"] = row[pk[0]] + "_" + row[pk[1]]
pk = "id"
result = dbconn.write('sybase', table["name"], records=rows, object_schema=schema, pk=pk) # transformer_mode='lossy'
#st.write(result)
def get_schema(table):
table_name = table["name"]
pk = table["pk"]
query = f"""
select column_name, base_type_str as column_type from systabcol c
inner join systable t on c.table_id = t.table_id
where t.table_name = '{table_name}' order by base_type_str
"""
columns_with_types = execute_query(query)
#st.table(columns_with_types)
schema = { "properties": {}, "type": "object" }
for col in columns_with_types:
schema["properties"][col["column_name"]] = {}
if "char" in col["column_type"]:
col_schema = {"type": ["string", "null"]}
elif "int" in col["column_type"]:
col_schema = {"type": ["integer", "null"]}
elif "numeric" in col["column_type"] or "float" in col["column_type"] or "decimal" in col["column_type"] or "double" in col["column_type"]:
col_schema = {"type": ["number", "null"]}
elif "date" in col["column_type"] or "timestamp" in col["column_type"] or "time" in col["column_type"]:
col_schema = {"type": ["string", "null"], "format": "date-time"}
else:
st.error(f"Unknown data type: %s" % col["column_type"])
exit()
schema["properties"][col["column_name"]] = col_schema
if isinstance(pk, list):
schema["properties"]["id"] = {"type": ["string", "null"]}
return schema
def get_bookmark(table):
bookmarks = pq.get_state()
if bookmarks and table["name"] in bookmarks:
return bookmarks[table["name"]]
else:
return "2024-01-01 00:00:00.000"
def set_bookmark(table, bookmark):
state = pq.get_state()
state[table["name"]] = bookmark
pq.set_state(state)
def sync_table(table, fullResync):
st.header(f"Syncing table %s" % table["name"])
schema = get_schema(table)
if fullResync:
bookmark = "2000-01-01 00:00:00.000"
else:
bookmark = get_bookmark(table)
new_bookmark = None
batch_number = 0
st.text(f"Syncing table %s incremental with mod_date >= %s" % (table["name"], bookmark))
while True:
rows = get_data_batch(table, batch_number, bookmark)
st.text(f"Batch %s, number of rows in batch: %s" % (batch_number, len(rows)))
if len(rows)>0:
write_data(table, rows, schema)
new_bookmark = rows[-1]["mod_date"] # last row is highest mod_date because results are ordered by mod_date asc
else:
if new_bookmark:
st.text(f"Saving new bookmark: %s" % new_bookmark)
set_bookmark(table, new_bookmark)
st.text("Table sync done")
return
batch_number += 1
def sync_tables(tables):
for table in tables:
sync_table(table, fullResync=False)
####### MAIN #########
dbconn = pq.dbconnect(pq.DW_NAME)
connection = sqlanydb.connect(
**{
"host": "host:port",
"uid": "login",
"pwd": "pw",
"dbn":"dbname",
}
)
cursor = connection.cursor()
#show_data("some_table")
#get_tables()
#get_columns(table_id=123)
# Add tables to sync
tables = [
{"name": "table1", "pk": "id"},
{"name": "table2", "pk": "id"},
{"name": "table3", "pk": "id"},
]
# Sync all tables
sync_tables(tables)
# Sync one table
# sync_table({"name": "tablename", "pk": "nummer"}, fullResync=True)
cursor.close()
connection.close()
st.text("Done")