# Entity browser for Microsoft Dynamics 365 F&O (Finance & Operations).
# Use this to find entity names and their primary keys, to add additional entities to the pipeline.
# Contact support@peliqan.io for questions.
dynamicsax_api = pq.connect('MS Dynamics 365 Finance and Operations (AX)')
reload = False
def load_entities():
st.text("Loading all entities from Dynamics F&O, this will take a minute or so.")
entities = dynamicsax_api.get('entities')
all_entities = []
for entity in entities:
entity_name = entity["@Name"]
pks = entity.get("Key",{}).get("PropertyRef", [])
if not isinstance(pks, list):
pks = [pks]
all_pks = []
for pk in pks:
pk_name = pk["@Name"]
all_pks.append(pk_name)
all_entities.append(
{
"name": entity_name,
"primary keys": ", ".join(all_pks)
}
)
all_entities = sorted(all_entities, key=lambda x: x["name"])
return all_entities
all_entities = pq.get_state()
if not all_entities or reload:
all_entities = load_entities()
pq.set_state(all_entities)
#st.table(all_entities)
search = st.text_input("Search (matches on start of entity name only):")
if search:
filtered_entities = [e for e in all_entities if e["name"].lower().startswith(search.lower())]
selected_entity = st.selectbox("Entities (start typing to filter)", options = filtered_entities, format_func=lambda x: x["name"])
if selected_entity:
st.subheader("Entity name")
st.write(selected_entity["name"])
st.subheader("Entity primary keys")
st.write(selected_entity["primary keys"])
# Bulk export from Microsoft Dynamics 365 F&O Finance & Operations, and import into the Peliqan data warehouse.
# This script will fetch all entities from Dynamics F&O with their PKs on the first run (metadata).
#
# Create a DMF Export project in F&O first (DMF = Data Management Framework).
# Use Source data format "XML-Element".
#
# Use list_projects() to find correct name of Export DMF Project in Dynamics F&O.
# Set correct project name and legal entity below:
export_project_name = "My export 2"
legal_entity_name = "DAT"
export_format = "XML" # or "CSV"
export_schema = "dynamics f&o bulk export"
###########################
import time
import zipfile
import io
import os
import requests
import pandas as pd
import re
import xml.etree.ElementTree as ET
from typing import Iterator, Dict, List
dynamicsax_api = pq.connect('MS Dynamics 365 Finance and Operations (AX)')
dbconn = pq.dbconnect(pq.DW_NAME)
extract_dir = f"/etc/shared_temp/dynamics_exported_data_{pq.INTERFACE_ID}"
reload_entities_from_api = False
start_new_export_run = True # Set to false to import already downloaded files
def main():
st.title("Microsoft Dynamics 365 F&O Bulk Export")
if start_new_export_run:
execution_id = start_export_run(export_project_name, legal_entity_name)
execution_status = ""
while not "Succeeded" in execution_status: # "Succeeded" or "PartiallySucceeded"
time.sleep(10)
execution_status = get_export_run_status(execution_id)
export_url = get_export_url(execution_id)
download_files(export_url)
if export_format == "CSV":
import_data_csv()
elif export_format == "XML":
import_data_xml()
else:
st.warning("Unknown export format")
def load_entities_pks():
pks_all_entities = pq.get_state()
if not pks_all_entities or reload_entities_from_api:
st.text("Loading and caching all entities with their PKs from Dynamics F&O, this will take a minute or so.")
def make_plural(word):
match = re.search(r"(.*?)(V[1-4])$", word) # Special case: entities ending with V1, V2, V3, V4
if match:
base, version = match.groups()
return make_plural(base) + version
lower_word = word.lower()
if lower_word == "person":
return "people"
if lower_word.endswith(("s", "sh", "ch", "x", "z")):
return word + "es"
if lower_word.endswith("y") and len(word) > 1 and word[-2].lower() not in "aeiou":
return word[:-1] + "ies"
if lower_word.endswith("f"):
return word[:-1] + "ves"
if lower_word.endswith("fe"):
return word[:-2] + "ves"
return word + "s"
entities = dynamicsax_api.get('entities')
pks_all_entities = {}
for entity in entities:
entity_name = entity["@Name"]
entity_name_plural = make_plural(entity_name)
pks = entity.get("Key",{}).get("PropertyRef", [])
if not isinstance(pks, list):
pks = [pks]
all_pks = []
for pk in pks:
pk_name = pk["@Name"]
if pk_name.upper() not in ["DATAAREAID"]:
all_pks.append(pk_name.upper())
pks_all_entities[entity_name_plural.lower()] = all_pks
pq.set_state(pks_all_entities)
return pks_all_entities
def list_projects():
projects = dynamicsax_api.get('dmfprojects')
export_projects = []
for project in projects:
if project["OperationType"] == "Export":
export_projects.append(project["Name"])
st.table(export_projects)
def start_export_run(export_project_name, legal_entity_name):
params = {
'reExecute': False,
'executionId': "",
'packageName': "MyExportPackage.zip",
'legalEntityId': legal_entity_name,
'definitionGroupId': export_project_name
}
result = dynamicsax_api.add('exportrun', params)
if result["status"] == "success":
execution_id = result["detail"]["value"]
st.write(f"Export run started, execution id: {execution_id}")
return execution_id
else:
st.warning("Could not start export run")
st.json(result)
exit()
def get_export_run_status(execution_id):
params = {
'executionId': execution_id
}
execution_status = dynamicsax_api.get('exportstatus', params)
st.write(f"Execution status: {execution_status}")
return execution_status
def get_export_url(execution_id):
params = {
'executionId': execution_id
}
export_url = dynamicsax_api.get('exportdownloadurl', params)
st.text(f"Export URL: {export_url}")
return export_url
def download_files(url):
os.makedirs(extract_dir, exist_ok=True)
response = requests.get(url)
response.raise_for_status()
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
zf.extractall(extract_dir)
st.write(f"Files extracted to: {os.path.abspath(extract_dir)}")
def import_data_csv():
pks_all_entities = load_entities_pks()
for filename in os.listdir(extract_dir):
st.subheader(f"File: {filename}")
file_path = os.path.join(extract_dir, filename)
if filename.lower().endswith(".csv"):
df = pd.read_csv(file_path)
st.write(df.head())
if len(df)>0:
entity_name_plural = filename.replace(".csv", "").replace(" ", "") # e.g. CustomersV3
table_name = filename.replace(".csv", "").lower().replace(" ", "_")
if entity_name_plural.lower() in pks_all_entities:
pks = pks_all_entities[entity_name_plural.lower()]
st.info(f"Importing data from file: entity name {entity_name_plural}, PKs: {pks}")
result = dbconn.write(export_schema, table_name, records = df, pk = pks)
status = result["status"]
if status.lower() != "success":
st.error("Error writing to DWH")
st.write(result)
else:
st.error(f"Cannot find PKs of entity {entity_name_plural}, skipping import of this file.")
else:
st.warning(f"No data in file, skipping.")
else:
st.warning(f"Not a CSV data file, skipping.")
os.remove(file_path)
def read_xml_in_chunks(file_path: str, chunk_size: int = 1000) -> Iterator[List[Dict]]:
# Processes all direct children of the Document root tag.
context = ET.iterparse(file_path, events=('start', 'end'))
chunk = []
depth = 0
current_record = None
for event, elem in context:
if event == 'start':
depth += 1
# Depth 2 means direct child of Document (depth 1)
if depth == 2:
current_record = elem
elif event == 'end':
# Process complete records (direct children of Document)
if depth == 2 and current_record is not None:
# Convert XML element to dictionary
row_dict = {}
# Add attributes if any
if elem.attrib:
row_dict.update(elem.attrib)
# Add child elements
for child in elem:
if child.text and child.text.strip():
row_dict[child.tag] = child.text.strip()
elif len(child) > 0:
# Handle nested elements
for nested in child:
if nested.text and nested.text.strip():
row_dict[f"{child.tag}_{nested.tag}"] = nested.text.strip()
# Add direct text if element has no children
if len(elem) == 0 and elem.text and elem.text.strip():
row_dict['value'] = elem.text.strip()
if row_dict: # Only add non-empty records
chunk.append(row_dict)
# Clear element to free memory
elem.clear()
current_record = None
# Yield chunk when it reaches the desired size
if len(chunk) >= chunk_size:
yield chunk
chunk = []
depth -= 1
# Yield remaining records
if chunk:
yield chunk
# Clean up
del context
def import_data_xml():
chunk_size = 1000
pks_all_entities = load_entities_pks()
for filename in os.listdir(extract_dir):
st.subheader(f"File: {filename}")
file_path = os.path.join(extract_dir, filename)
if filename == "Manifest.xml" or filename == "PackageHeader.xml":
pass
elif filename.lower().endswith(".xml"):
try:
# Process first chunk to show preview
total_records = 0
entity_name_plural = filename.replace(".xml", "").replace(" ", "")
table_name = filename.replace(".xml", "").lower().replace(" ", "_")
if entity_name_plural.lower() not in pks_all_entities:
st.error(f"Cannot find PKs of entity {entity_name_plural}, skipping import of this file.")
continue
pks = pks_all_entities[entity_name_plural.lower()]
st.info(f"Importing data from file: entity name {entity_name_plural}, PKs: {pks}")
# Process file in chunks
for chunk_idx, chunk in enumerate(read_xml_in_chunks(file_path, chunk_size)):
if chunk_idx == 0:
# Show preview of first chunk
st.write(f"Preview (first {min(5, len(chunk))} records):")
st.write(chunk[:5])
total_records += len(chunk)
# Convert chunk to DataFrame for writing
df_chunk = pd.DataFrame(chunk)
result = dbconn.write(export_schema, table_name, records=df_chunk, pk=pks)
status = result["status"]
if status.lower() != "success":
st.error(f"Error writing chunk {chunk_idx + 1} to DWH")
st.write(result)
break
# Show progress
st.write(f"Processed chunk {chunk_idx + 1}: {len(chunk)} records")
if total_records == 0:
st.warning(f"No data in file, skipping.")
else:
st.success(f"Successfully imported {total_records} total records")
except ET.ParseError as e:
st.error(f"Error parsing XML file: {e}")
except Exception as e:
st.error(f"Error processing file: {e}")
else:
st.warning(f"Not an XML data file, skipping.")
try:
os.remove(file_path)
except Exception as e:
st.warning(f"Could not remove file: {e}")
main()