MongoDB is a popular open-source NoSQL database that utilizes a flexible, document-oriented data model, making it ideal for handling large volumes of unstructured data. Known for its scalability and performance, MongoDB supports modern application development with features like indexing, sharding, and real-time data processing.
This article provides an overview to get started with the MongoDB connector in Peliqan. Please contact support if you have any additional questions or remarks.
Below is an example Python script that will add and/or update _schema in each DB. This script will perform introspection on the first 100 records of each collection to generate a _schema record:
‣
Click here to expand script
import pymongo
from pymongo.server_api import ServerApi
username = 'xxx'
password = 'xxx'
host = 'cluster0.xxx.mongodb.net'
client = pymongo.MongoClient('mongodb+srv://' + username + ':' + password + '@' + host + '/?retryWrites=true&w=majority', server_api=ServerApi('1'))
# This script will connect to a MongoDB and create or update a collection _schema in each DB, with the schema (column names and types) of all the collections in that DB.
# _schema is needed for Trino to make SQL queries on MongoDB. Trino data types to be used in _schema: https://trino.io/docs/current/language/types.html#real
def update_schema(db, collection):
key_types = {}
cursor = collection.find().limit(100)
for document in cursor:
#print("----Document for introspection: " + str(document))
for key, val in document.items():
if key not in key_types:
if key == '_id':
key_type = 'ObjectId'
elif isinstance(val, dict) or isinstance(val, list):
key_type = 'varchar'
elif isinstance(val, int) or val.isdigit():
key_type = 'bigint'
elif isinstance(val, (float, complex)):
key_type = 'real'
else:
key_type = 'varchar'
key_types[key] = key_type
fields = []
for key, key_type in key_types.items():
fields.append({'name': key, 'type': key_type, 'hidden': False})
schema = {'table': collection.name, 'fields': fields}
print("--Collection schema: " + str(schema))
#if collection _schema does not exist, it will automatically be added
db['_schema'].update_one({'table': collection.name}, {"$set": schema}, upsert=True)
def process_dbs_and_tables(client):
dbs = client.list_database_names()
for db_name in dbs:
print("Database: " + db_name)
db = client[db_name]
#db.drop_collection('_schema')
for collection_name in db.list_collection_names():
if collection_name not in ['_schema', 'oplog.rs']:
print("--Collection: " + collection_name)
update_schema(db, db[collection_name])
process_dbs_and_tables(client)
Need further help
Please contact our support for any further assistance via support@peliqan.io.