You can connect a Sybase database (now called SAP SQL Anywhere) in Peliqan using a custom pipeline script. The pipeline will sync a selection of tables (or all tables) from a Sybase database into the Peliqan data warehouse. The data will be synced incrementally, based on a timestamp column. If that is not present, a full sync can be performed. The script will store the “bookmark” per table in the state, in order to reminder from which timestamp to sync new and updated records. Note that the script does not handle deleted record
Copy the below code in your account, and make following changes to the script:
- Change host, port, login, password to connect to the Sybase DB
- Set the list of tables to sync with their PK (primary key) column
- Change the code in
get_data_batch
in case there is no timestamp last update column - Change the column name
mod_date
in the script, in case the timestamp column is named differently
import sqlanydb
def to_dict_list(cursor, rows):
list_of_dicts = []
for row in rows:
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
list_of_dicts.append(d)
return list_of_dicts
def execute_query(query):
cursor.execute(query)
result = cursor.fetchall()
result_dict = to_dict_list(cursor, result)
return result_dict
def get_tables():
query = """
select t.table_id, t.table_name, t.count, c.column_name as pk_name
from systable t left join syscolumn c on t.table_id = c.table_id where
t.creator<>0 and t.table_type='BASE' and c.pkey='Y' and t.count>0
"""
tables = execute_query(query)
st.table(tables)
def get_columns(table_id):
query = f"select * from syscolumn where table_id = %s" % table_id # see pkey for primary key(s)
columns_with_pk = execute_query(query)
st.table(columns_with_pk)
query = f"select * from systabcol where table_id = %s" % table_id # see base_type_str for column type
columns_with_types = execute_query(query)
st.table(columns_with_types)
def get_data_batch(table, batch_number, bookmark):
batch_size = 2000
batch_start = (batch_number * batch_size) + 1 # batch_number starts at 0
# for full sync (not incremental), in case there is no mod_date timestamp column
# use pk in "order by"
#pk = table["pk"]
#if isinstance(pk, list):
# pk = ','.join(table["pk"])
#else:
# pk = table["pk"]
query = f"select top %s start at %s * from dba.%s where mod_date>='%s' order by mod_date asc" % (batch_size, batch_start, table["name"], bookmark)
rows = execute_query(query)
return rows
def show_data(table_name):
query = f"select top 10 * from dba.%s" % table_name
rows = execute_query(query)
st.table(rows)
def write_data(table, rows, schema):
pk = table["pk"]
if isinstance(pk, list):
if len(pk)>2:
st.error("PK with more than 2 fields, adapt code for id field accordingly.")
st.exit()
for i, row in enumerate(rows):
rows[i]["id"] = row[pk[0]] + "_" + row[pk[1]]
pk = "id"
result = dbconn.write('sybase', table["name"], records=rows, object_schema=schema, pk=pk) # transformer_mode='lossy'
#st.write(result)
def get_schema(table):
table_name = table["name"]
pk = table["pk"]
query = f"""
select column_name, base_type_str as column_type from systabcol c
inner join systable t on c.table_id = t.table_id
where t.table_name = '{table_name}' order by base_type_str
"""
columns_with_types = execute_query(query)
#st.table(columns_with_types)
schema = { "properties": {}, "type": "object" }
for col in columns_with_types:
schema["properties"][col["column_name"]] = {}
if "char" in col["column_type"]:
col_schema = {"type": ["string", "null"]}
elif "int" in col["column_type"]:
col_schema = {"type": ["integer", "null"]}
elif "numeric" in col["column_type"] or "float" in col["column_type"] or "decimal" in col["column_type"] or "double" in col["column_type"]:
col_schema = {"type": ["number", "null"]}
elif "date" in col["column_type"] or "timestamp" in col["column_type"] or "time" in col["column_type"]:
col_schema = {"type": ["string", "null"], "format": "date-time"}
else:
st.error(f"Unknown data type: %s" % col["column_type"])
exit()
schema["properties"][col["column_name"]] = col_schema
if isinstance(pk, list):
schema["properties"]["id"] = {"type": ["string", "null"]}
return schema
def get_bookmark(table):
bookmarks = pq.get_state()
if bookmarks and table["name"] in bookmarks:
return bookmarks[table["name"]]
else:
return "2024-01-01 00:00:00.000"
def set_bookmark(table, bookmark):
state = pq.get_state()
state[table["name"]] = bookmark
pq.set_state(state)
def sync_table(table, fullResync):
st.header(f"Syncing table %s" % table["name"])
schema = get_schema(table)
if fullResync:
bookmark = "2000-01-01 00:00:00.000"
else:
bookmark = get_bookmark(table)
new_bookmark = None
batch_number = 0
st.text(f"Syncing table %s incremental with mod_date >= %s" % (table["name"], bookmark))
while True:
rows = get_data_batch(table, batch_number, bookmark)
st.text(f"Batch %s, number of rows in batch: %s" % (batch_number, len(rows)))
if len(rows)>0:
write_data(table, rows, schema)
new_bookmark = rows[-1]["mod_date"] # last row is highest mod_date because results are ordered by mod_date asc
else:
if new_bookmark:
st.text(f"Saving new bookmark: %s" % new_bookmark)
set_bookmark(table, new_bookmark)
st.text("Table sync done")
return
batch_number += 1
def sync_tables(tables):
for table in tables:
sync_table(table, fullResync=False)
####### MAIN #########
dbconn = pq.dbconnect(pq.DW_NAME)
connection = sqlanydb.connect(
**{
"host": "host:port",
"uid": "login",
"pwd": "pw",
"dbn":"dbname",
}
)
cursor = connection.cursor()
#show_data("some_table")
#get_tables()
#get_columns(table_id=123)
# Add tables to sync
tables = [
{"name": "table1", "pk": "id"},
{"name": "table2", "pk": "id"},
{"name": "table3", "pk": "id"},
]
# Sync all tables
sync_tables(tables)
# Sync one table
# sync_table({"name": "tablename", "pk": "nummer"}, fullResync=True)
cursor.close()
connection.close()
st.text("Done")