Getting started with MongoDB in Peliqan

Peliqan uses Trino to connect to MongoDB and to execute SQL queries on MongoDB.

A _schema collection must be present in each database, which defines the schema.

Example record in _schema:

{
    "_id": {
        "$oid": "65b8fb86593a75d575ee835b"
    },
    "table": "my_collection",
    "fields": [
        {
            "name": "_id",
            "type": "ObjectId",
            "hidden": false
        },
        {
            "name": "name",
            "type": "varchar",
            "hidden": false
        },
        {
            "name": "age",
            "type": "bigint",
            "hidden": false
        }
    ]
}

Below is an example Python script that will add and/or update _schema in each DB. This script will perform introspection on the first 100 records of each collection to generate a _schema record:

Click here to expand script
import pymongo
from pymongo.server_api import ServerApi

username = 'xxx'
password = 'xxx'
host = 'cluster0.xxx.mongodb.net'
client = pymongo.MongoClient('mongodb+srv://' + username + ':' + password + '@' + host + '/?retryWrites=true&w=majority', server_api=ServerApi('1'))

# This script will connect to a MongoDB and create or update a collection _schema in each DB, with the schema (column names and types) of all the collections in that DB.
# _schema is needed for Trino to make SQL queries on MongoDB. Trino data types to be used in _schema: https://trino.io/docs/current/language/types.html#real

def update_schema(db, collection):
    key_types = {}
    cursor = collection.find().limit(100)
    for document in cursor:
        #print("----Document for introspection: " + str(document))
        for key, val in document.items():
            if key not in key_types:
                if key == '_id':
                    key_type = 'ObjectId'
                elif isinstance(val, dict) or isinstance(val, list):
                    key_type = 'varchar'
                elif isinstance(val, int) or val.isdigit():
                    key_type = 'bigint'
                elif isinstance(val, (float, complex)):
                    key_type = 'real'
                else:
                    key_type = 'varchar'
                key_types[key] = key_type
    fields = []
    for key, key_type in key_types.items():
        fields.append({'name': key, 'type': key_type, 'hidden': False})
    schema = {'table': collection.name, 'fields': fields}
    print("--Collection schema: " + str(schema))
    #if collection _schema does not exist, it will automatically be added
    db['_schema'].update_one({'table': collection.name}, {"$set": schema}, upsert=True)

def process_dbs_and_tables(client):
    dbs = client.list_database_names()
    for db_name in dbs:
        print("Database: " + db_name)
        db = client[db_name]
        #db.drop_collection('_schema')
        for collection_name in db.list_collection_names():
            if collection_name not in ['_schema', 'oplog.rs']:
                print("--Collection: " + collection_name)
                update_schema(db, db[collection_name])

process_dbs_and_tables(client)