Peliqan uses Trino to connect to MongoDB and to execute SQL queries on MongoDB.
A _schema
collection must be present in each database, which defines the schema.
Example record in _schema:
{
"_id": {
"$oid": "65b8fb86593a75d575ee835b"
},
"table": "my_collection",
"fields": [
{
"name": "_id",
"type": "ObjectId",
"hidden": false
},
{
"name": "name",
"type": "varchar",
"hidden": false
},
{
"name": "age",
"type": "bigint",
"hidden": false
}
]
}
Below is an example Python script that will add and/or update _schema
in each DB. This script will perform introspection on the first 100 records of each collection to generate a _schema
record:
‣
import pymongo
from pymongo.server_api import ServerApi
username = 'xxx'
password = 'xxx'
host = 'cluster0.xxx.mongodb.net'
client = pymongo.MongoClient('mongodb+srv://' + username + ':' + password + '@' + host + '/?retryWrites=true&w=majority', server_api=ServerApi('1'))
# This script will connect to a MongoDB and create or update a collection _schema in each DB, with the schema (column names and types) of all the collections in that DB.
# _schema is needed for Trino to make SQL queries on MongoDB. Trino data types to be used in _schema: https://trino.io/docs/current/language/types.html#real
def update_schema(db, collection):
key_types = {}
cursor = collection.find().limit(100)
for document in cursor:
#print("----Document for introspection: " + str(document))
for key, val in document.items():
if key not in key_types:
if key == '_id':
key_type = 'ObjectId'
elif isinstance(val, dict) or isinstance(val, list):
key_type = 'varchar'
elif isinstance(val, int) or val.isdigit():
key_type = 'bigint'
elif isinstance(val, (float, complex)):
key_type = 'real'
else:
key_type = 'varchar'
key_types[key] = key_type
fields = []
for key, key_type in key_types.items():
fields.append({'name': key, 'type': key_type, 'hidden': False})
schema = {'table': collection.name, 'fields': fields}
print("--Collection schema: " + str(schema))
#if collection _schema does not exist, it will automatically be added
db['_schema'].update_one({'table': collection.name}, {"$set": schema}, upsert=True)
def process_dbs_and_tables(client):
dbs = client.list_database_names()
for db_name in dbs:
print("Database: " + db_name)
db = client[db_name]
#db.drop_collection('_schema')
for collection_name in db.list_collection_names():
if collection_name not in ['_schema', 'oplog.rs']:
print("--Collection: " + collection_name)
update_schema(db, db[collection_name])
process_dbs_and_tables(client)