Connectors

Contact support

Sybase (SAP SQL Anywhere) - Getting started in Peliqan

Sybase (SAP SQL Anywhere) - Getting started in Peliqan

You can connect a Sybase database (now called SAP SQL Anywhere) in Peliqan using a custom pipeline script. The pipeline will sync a selection of tables (or all tables) from a Sybase database into the Peliqan data warehouse. The data will be synced incrementally, based on a timestamp column. If that is not present, a full sync can be performed. The script will store the “bookmark” per table in the state, in order to reminder from which timestamp to sync new and updated records. Note that the script does not handle deleted record

Copy the below code in your account, and make following changes to the script:

  • Change host, port, login, password to connect to the Sybase DB
  • Set the list of tables to sync with their PK (primary key) column
  • Change the code in get_data_batch in case there is no timestamp last update column
  • Change the column name mod_date in the script, in case the timestamp column is named differently
import sqlanydb

def to_dict_list(cursor, rows):
    list_of_dicts = []
    for row in rows:
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        list_of_dicts.append(d)
    return list_of_dicts

def execute_query(query):
    cursor.execute(query)
    result = cursor.fetchall()
    result_dict = to_dict_list(cursor, result)
    return result_dict

def get_tables():
    query = """
    select t.table_id, t.table_name, t.count, c.column_name as pk_name 
    from systable t left join syscolumn c on t.table_id = c.table_id where 
    t.creator<>0 and t.table_type='BASE' and c.pkey='Y' and t.count>0
    """
    tables = execute_query(query)
    st.table(tables)

def get_columns(table_id):
    query = f"select * from syscolumn where table_id = %s" % table_id # see pkey for primary key(s)
    columns_with_pk = execute_query(query)
    st.table(columns_with_pk)

    query = f"select * from systabcol where table_id = %s" % table_id # see base_type_str for column type
    columns_with_types = execute_query(query)
    st.table(columns_with_types)

def get_data_batch(table, batch_number, bookmark):
    batch_size = 2000
    batch_start = (batch_number * batch_size) + 1  # batch_number starts at 0

    # for full sync (not incremental), in case there is no mod_date timestamp column
    # use pk in "order by"
    #pk = table["pk"]
    #if isinstance(pk, list):
    #    pk = ','.join(table["pk"])
    #else:
    #    pk = table["pk"]
    
    query = f"select top %s start at %s * from dba.%s where mod_date>='%s' order by mod_date asc" % (batch_size, batch_start, table["name"], bookmark)
    rows = execute_query(query)
    return rows

def show_data(table_name):
    query = f"select top 10 * from dba.%s" % table_name
    rows = execute_query(query)
    st.table(rows)

def write_data(table, rows, schema):    
    pk = table["pk"]
    if isinstance(pk, list):
        if len(pk)>2:
            st.error("PK with more than 2 fields, adapt code for id field accordingly.")
            st.exit()
        for i, row in enumerate(rows):
            rows[i]["id"] = row[pk[0]] + "_" + row[pk[1]]
        pk = "id"
    result = dbconn.write('sybase', table["name"], records=rows, object_schema=schema, pk=pk) # transformer_mode='lossy'
    #st.write(result)

def get_schema(table):
    table_name = table["name"]
    pk = table["pk"]
    query = f"""
    select column_name, base_type_str as column_type from systabcol c 
    inner join systable t on c.table_id = t.table_id
    where t.table_name = '{table_name}' order by base_type_str
    """
    
    columns_with_types = execute_query(query)
    #st.table(columns_with_types)
    
    schema = { "properties": {}, "type": "object" }
    for col in columns_with_types:
        schema["properties"][col["column_name"]] = {}
        if "char" in col["column_type"]:
            col_schema = {"type": ["string", "null"]}
        elif "int" in col["column_type"]:
            col_schema = {"type": ["integer", "null"]}
        elif "numeric" in col["column_type"] or "float" in col["column_type"] or "decimal" in col["column_type"] or "double" in col["column_type"]:
            col_schema = {"type": ["number", "null"]}
        elif "date" in col["column_type"] or "timestamp" in col["column_type"]  or "time" in col["column_type"]:
            col_schema = {"type": ["string", "null"], "format": "date-time"}
        else:
            st.error(f"Unknown data type: %s" % col["column_type"])
            exit()
        schema["properties"][col["column_name"]] = col_schema
    if isinstance(pk, list):
       schema["properties"]["id"] = {"type": ["string", "null"]}
    return schema

def get_bookmark(table):
    bookmarks = pq.get_state()
    if bookmarks and table["name"] in bookmarks:
        return bookmarks[table["name"]]
    else:
        return "2024-01-01 00:00:00.000"

def set_bookmark(table, bookmark):
    state = pq.get_state()
    state[table["name"]] = bookmark
    pq.set_state(state)
    
def sync_table(table, fullResync):
    st.header(f"Syncing table %s" % table["name"])
    schema = get_schema(table)
    if fullResync:
        bookmark = "2000-01-01 00:00:00.000"
    else:
        bookmark = get_bookmark(table)
    new_bookmark = None
    batch_number = 0
    st.text(f"Syncing table %s incremental with mod_date >= %s" % (table["name"], bookmark))
    while True:
        rows = get_data_batch(table, batch_number, bookmark)
        st.text(f"Batch %s, number of rows in batch: %s" % (batch_number, len(rows)))
        if len(rows)>0:
            write_data(table, rows, schema)
            new_bookmark = rows[-1]["mod_date"] # last row is highest mod_date because results are ordered by mod_date asc
        else:
            if new_bookmark:
                st.text(f"Saving new bookmark: %s" % new_bookmark)
                set_bookmark(table, new_bookmark)
            st.text("Table sync done")
            return
        batch_number += 1
    
    
def sync_tables(tables):
    for table in tables:
        sync_table(table, fullResync=False)



####### MAIN #########

dbconn = pq.dbconnect(pq.DW_NAME)
connection = sqlanydb.connect(
	**{
	    "host": "host:port", 
	    "uid": "login", 
	    "pwd": "pw", 
	    "dbn":"dbname",
	}
)
cursor = connection.cursor() 

#show_data("some_table")
#get_tables()
#get_columns(table_id=123)

# Add tables to sync
tables = [
    {"name": "table1",  "pk": "id"},
    {"name": "table2",  "pk": "id"},
    {"name": "table3",  "pk": "id"},
]

# Sync all tables
sync_tables(tables)

# Sync one table
# sync_table({"name": "tablename", "pk": "nummer"}, fullResync=True)

cursor.close()
connection.close()
st.text("Done")