Example custom pipeline script in Peliqan to read Parquet files from an AWS S3 bucket and import them into the data warehouse:
import boto3
import pandas as pd
from io import BytesIO
access_key = "xxx"
secret_access_key = pq.get_secret("S3 secret") # Use Peliqan Secret Store
bucket_name = "mybucket"
dw = pq.dbconnect(pq.DW_NAME)
session = boto3.Session(
aws_access_key_id = access_key,
aws_secret_access_key = secret_access_key
)
s3 = session.resource('s3')
bucket = s3.Bucket(bucket_name)
for obj in bucket.objects.all():
key = obj.key
if key.endswith('.parquet'):
file_contents = obj.get()['Body'].read()
df = pd.read_parquet(BytesIO(file_contents))
rows = df.to_dict(orient='records')
batch_size = 1000 # write in batches, for large files
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
result = dw.write(schema_name, table_name, batch, pk='id')