# MCP Server with oAuth, using SSO with Microsoft Azure Entra
# Version 3.1
#
# ADDING CAPABILITIES TO THIS MCP SERVER:
# Add more MCP "tools": see below section in code with all MCP tools, decorate your function with @mcp.tool()
##### SETTINGS
# Microsoft Azure tenant id:
tenant_id = "a35e450d-10f3-43ec-bbb5-4f370161c30c"
# Peliqan account id
peliqan_account_id = 123
# Add the Azure user name, and as value the personal API key from Peliqan (See user settings > API token). Store API keys in the Peliqan Secrets store !
user_mappings = {
"bill@acme.com": "eyJ0e...V5y-IhqbjU",
}
##### END OF SETTINGS
mcp_url = f"https://api.eu.peliqan.io/{peliqan_account_id}/mcp"
authorization_server_url = mcp_url
import json
import inspect
from typing import get_type_hints
from typing import List, Dict, Any
from urllib.parse import parse_qs
import base64
ENABLE_RAG = True
MCP_TOOLS = []
class mcp:
def tool():
def decorator(func):
sig = inspect.signature(func)
type_hints = get_type_hints(func)
# parse descriptions from docstring using :param style
raw_doc = func.__doc__ or ""
param_docs = {}
for line in raw_doc.splitlines():
line = line.strip()
if line.startswith(":param"):
# example: ":param first_name: the user's first name"
try:
_, rest = line.split("param", 1)
name, desc = rest.split(":", 1)
param_docs[name.strip()] = desc.strip()
except ValueError:
pass
properties = {}
required = []
for name, param in sig.parameters.items():
hint = type_hints.get(name, "any")
type_str = hint.__name__ if isinstance(hint, type) else str(hint)
if type_str == "str":
mcp_type_str = "string"
elif type_str == "int":
mcp_type_str = "number"
else:
mcp_type_str = "string"
prop = {
"type": mcp_type_str,
"description": param_docs.get(name, "")
}
if param.default is not inspect._empty:
prop["default"] = param.default
else:
required.append(name)
properties[name] = prop
MCP_TOOLS.append({
"name": func.__name__,
"description": raw_doc.strip().split("\n")[0] if raw_doc else "",
"inputSchema": {
"type": "object",
"properties": properties,
"required": required
}
})
return func
return decorator
def get_tool_response_format(tool_name):
func = globals().get(tool_name)
response_annotation = inspect.signature(func).return_annotation
if hasattr(response_annotation, "__name__"):
return response_annotation.__name__
return str(response_annotation).replace("typing.", "").replace("class '", "").replace("'>","")
if ENABLE_RAG:
openai_api = pq.connect('OpenAI')
EMBED_MODELS = {
"text-embedding-3-large": 3072,
"text-embedding-3-small": 1536
}
model = "text-embedding-3-large"
dimension = EMBED_MODELS[model]
def create_embedding(text):
"""Create embedding using OpenAI API"""
embedding_request = {
"input": text,
"model": model,
"dimensions": dimension
}
response = openai_api.get('embeddings', embedding_request)
embedding = response.get("data", [{}])[0].get("embedding")
return embedding
########################### ADD MCP TOOLS BELOW ###########################
@mcp.tool()
def say_hello(first_name: str, last_name: str = "") -> str:
"""
Peliqan will say hello.
:param first_name: the user's first name
:param last_name: optional last name
"""
return f"Hi there {first_name} {last_name} from Peliqan MCP server !"
@mcp.tool()
def list_connections() -> List[str]:
"""
Returns list of all ELT connections in the Peliqan account.
"""
connections = pq.list_connections()
conn_names = []
for connection in connections:
conn_names.append(connection["name"])
#return ", ".join(conn_names)
return conn_names
@mcp.tool()
def list_tables() -> List[Dict[str, Any]]:
"""
Returns list of all the tables in the Peliqan account.
"""
all_tables = []
for db in pq.list_databases():
for table in db["tables"]:
#schema_name = next((s["name"] for s in db["schemas"] if s["id"] == table["schema_id"]), None)
schema_name, schema_group_id = next(((s["name"], s["group"]) for s in db["schemas"] if s["id"] == table["schema_id"]), (None, None))
if schema_group_id == current_user_peliqan_group_id:
all_tables.append({
"db_id": db["id"],
"db": db["name"],
"schema_id": table["schema_id"],
"schema": schema_name,
"group_id": schema_group_id,
"table_id": table["id"],
"table": table["name"]
})
return all_tables
@mcp.tool()
def list_columns(table_id: int) -> List[str]:
"""
Returns list of all the fields (columns) of a table.
"""
table_meta = pq.get_table(table_id)
columns = table_meta.get("all_fields", [])
column_names = [col["name"] for col in columns if not col["name"].startswith("_sdc")]
return column_names
@mcp.tool()
def list_scripts() -> List[Dict[str, Any]]:
"""
Returns list of all scripts in the Peliqan account.
"""
scripts = pq.list_scripts()
scripts_list = [{"id": d.get("id"), "name": d.get("name")} for d in scripts]
return scripts_list
@mcp.tool()
def upsert_script(script_name: str, raw_script: str):
"""
Upserts a script by name in the Peliqan account.
Creates a new script if the name is not found, updates the script otherwise.
"""
scripts = pq.list_scripts()
found = False
for script in scripts:
if script["name"] == script_name:
result = pq.update_script(script_id = script["id"], raw_script = raw_script)
found = True
break
if not found:
result = pq.add_script(group_name = 'General', raw_script = raw_script, name = script_name)
return result
@mcp.tool()
def get_script_state(script_name: str):
"""
Get a script state in the Peliqan account.
Script state contains e.g. script configuration.
"""
scripts = pq.list_scripts()
for script in scripts:
if script_name == script["name"] or (str(script_name).isdigit() and str(script_name) == str(script["id"])):
script_meta_data = pq.get_script(script_id=script["id"])
return script_meta_data["state"]
@mcp.tool()
def execute_query(query: str) -> List[Dict[str, Any]]:
"""
Executes an SQL query on the data warehouse and returns a list of rows.
"""
query = query.replace('"' + pq.DW_NAME + '".', '').replace(pq.DW_NAME + '.', '')
dbconn = pq_personal.dbconnect(pq.DW_NAME)
rows = dbconn.fetch(pq.DW_NAME, query = query)
return rows
if ENABLE_RAG:
@mcp.tool()
def rag_search(search_text: str, schema: str, table: str) -> List[Dict[str, Any]]:
"""
Executes a RAG search on embedding tables in the data warehouse and returns a list of rows.
"""
top_k = 5
print(f"Search text: {search_text}, schema: {schema}, table: {table}")
search_embedding = create_embedding(search_text)
if search_embedding:
embedding_str = "[" + ",".join(map(str, search_embedding)) + "]"
search_query = f"""
SELECT id, text, metadata,
1 - (embedding <#> '{embedding_str}'::vector) AS similarity
FROM "{schema}"."rag_{table}"
ORDER BY similarity DESC
LIMIT {top_k}
"""
dbconn = pq_personal.dbconnect(pq.DW_NAME)
search_result = dbconn.execute(pq.DW_NAME, query=search_query)
records = search_result["detail"]
# Convert from list format with headers to list of dicts
dict_records = []
if records and len(records) > 0:
headers = records[0] # First row contains column names
for row in records[1:]: # Skip header row
record_dict = {}
for i, header in enumerate(headers):
record_dict[header] = row[i]
dict_records.append(record_dict)
return dict_records
else:
return []
@mcp.tool()
def pipedrive_add_contact(*args, **kwargs):
"""
Example of a writeback function: add a contact to Pipedrive (CRM).
"""
pipedrive_api = pq.connect('Pipedrive')
person = {}
if "name" in kwargs:
person["name"] = kwargs["name"]
if "email" in kwargs:
person["emails"] = [{'label': 'work', 'value': kwargs["email"], 'primary': True}]
result = pipedrive_api.add('person', person)
if "status" in result and result["status"] == "success":
new_contact = result.get("detail", {}).get("data", {})
return f"Contact added to Pipedrive: {new_contact}"
else:
print("Response from Pipedrive:", result)
error = result.get("detail", {}).get("response_json", {}).get("error", "")
return f"Error adding contact to Pipedrive: {error}"
########################### END OF MCP TOOLS ###########################
def log_request(request):
print("request method: ", request['method'])
print("request url: ", request['url'])
print("request query string: ", request['query_string'])
print("request headers: ", request['headers'])
print("request body:")
try:
print(json.dumps(request['data'], indent=2))
except:
print(request['data'])
def log_response(response):
print("Response:")
print(json.dumps(response, indent=2))
def mcp_response_initialize(id):
response_initialize = {
"jsonrpc": "2.0",
"id": id,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"callTool": True,
"listTools": True,
"tools": {
"listChanged": False
}
},
"serverInfo": {
"name": "peliqan-mcp",
"version": "0.0.1"
}
}
}
return response_initialize
def mcp_response_tools_list(id):
response_tools_list = {
"jsonrpc": "2.0",
"id": id,
"result": {
"tools": MCP_TOOLS
}
}
return response_tools_list
def mcp_response_tools_call(id, response_type):
response_tools_call = {
"jsonrpc": "2.0",
"id": id,
"result": {
"content": [
{
"type": response_type,
response_type: ""
}
],
"isError": False
}
}
return response_tools_call
def mcp_response_oauth_protected_resource():
# response for call to <mcp_server>/.well-known/oauth-protected-resource
response_oauth_protected_resource = {
"resource": mcp_url,
"authorization_servers": [authorization_server_url],
"scopes_supported": [f"{mcp_url}/.default"],
"bearer_methods_supported": ["header"]
}
return response_oauth_protected_resource
def mcp_response_authorization_server_openid_config_azure_entra():
# response for call to <authorization_server>/.well-known/openid-configuration
response_openid_config_v2 = {
"issuer": authorization_server_url,
"authorization_endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize",
"token_endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"]
}
return response_openid_config_v2
def check_access_token(access_token):
global pq_personal
parts = access_token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
payload = parts[1]
padding = "=" * (-len(payload) % 4)
decoded = base64.urlsafe_b64decode(payload + padding)
claims = json.loads(decoded)
azure_username = claims.get("upn", "Unkown")
print(f"Azure username from token: {azure_username}")
if azure_username not in user_mappings:
return None, None
else:
# Apply user impersonation
user_peliqan_api_key = user_mappings[azure_username]
pq_personal = Peliqan(user_peliqan_api_key)
# In case Azure groups from current user are needed to apply specific permissions in this MCP Server
azure_group_ids = claims.get("groups", []) # we can only get group ids, not names. Use "Microsoft Entra" connector to lookup group ids in table.
return azure_username
def handler(request):
log_request(request)
if "/.well-known/oauth-protected-resource" in request['url']:
print("MCP Server URL: oAuth protected resource URL called")
return mcp_response_oauth_protected_resource()
elif "/.well-known/openid-configuration" in request['url']:
print("Authorization server URL: openid configuration called")
return mcp_response_authorization_server_openid_config_azure_entra()
elif "Authorization" in request['headers']:
print("Checking Authorization header (should contain access token)")
access_token = request['headers']['Authorization'].replace("Bearer ", "")
azure_username = check_access_token(access_token)
if not azure_username:
print("Unknown user, make sure to add the Azure username to user_mappings")
return "Unauthorized", 401
else:
print("Not authorized")
protected_resource_url = f"{mcp_url}/.well-known/oauth-protected-resource"
return "Unauthorized", 401, { 'WWW-Authenticate': f'Bearer resource_metadata="{protected_resource_url}"' }
data = request['data']
if not data:
data = "{}"
mcp_req = json.loads(data)
id = 0
if "id" in mcp_req:
id = mcp_req["id"]
mcp_response = mcp_response_initialize(id)
if "method" in mcp_req:
if mcp_req["method"] == "initialize":
response = mcp_response_initialize(id)
elif mcp_req["method"] == "notifications/initialized":
mcp_response = ""
elif mcp_req["method"] == "tools/list":
mcp_response = mcp_response_tools_list(id)
elif mcp_req["method"] == "tools/call":
tool_name = mcp_req["params"]["name"]
args = {}
if "arguments" in mcp_req["params"]:
args = mcp_req["params"]["arguments"]
isError = False
try:
tool_response = globals()[tool_name](**args) #Invoking tool
except Exception as e:
print(e)
tool_response = str(e)
isError = True
tool_response_format = get_tool_response_format(tool_name) # str, List
response_type = "text"
mcp_response = mcp_response_tools_call(id, response_type)
if isError:
mcp_response["result"]["isError"] = isError
if tool_response_format == "str":
mcp_response["result"]["content"][0][response_type] = tool_response
else: # List
mcp_response["result"]["content"][0][response_type] = json.dumps(tool_response)
log_response(mcp_response)
return mcp_response