You can build low-code data pipelines in Peliqan that import data from S3 buckets or write data into S3 buckets.
Export data to S3
Example to write a file to an S3 bucket:
from io import BytesIO
import boto3
aws_access_key_id = "your_aws_access_key_id"
aws_secret_access_key = "your_aws_secret_access_key"
aws_bucket_name = "your_bucket_name"
s3_client = boto3.client(
's3',
aws_access_key_id = aws_access_key_id,
aws_secret_access_key= aws_secret_access_key,
)
s3_client.upload_fileobj(BytesIO(file_contents), aws_bucket_name, file_name)
Example to export a table from the Peliqan data warehouse into a CSV file on S3:
from io import BytesIO, StringIO
import pandas as pd
import boto3
aws_access_key_id = "your_aws_access_key_id"
aws_secret_access_key = "your_aws_secret_access_key"
aws_bucket_name = "your_bucket_name"
s3_client = boto3.client(
's3',
aws_access_key_id = aws_access_key_id,
aws_secret_access_key= aws_secret_access_key
)
dbconn = pq.dbconnect(pq.DW_NAME)
df = dbconn.fetch(pq.DW_NAME, 'schema_name', 'table_name', df=True)
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
csv_data = csv_buffer.getvalue()
s3_client.put_object(Bucket = aws_bucket_name, Key = 'my_data.csv', Body = csv_data)
Import data from S3
Example to import CSV files from S3 into a table in the Peliqan DWH:
import boto3
import pandas as pd
from io import BytesIO
aws_access_key_id = "your_aws_access_key_id"
aws_secret_access_key = "your_aws_secret_access_key"
aws_bucket_name = "your_bucket_name"
dw = pq.dbconnect(pq.DW_NAME)
session = boto3.Session(
aws_access_key_id = '',
aws_secret_access_key = '',
)
s3 = session.resource('s3')
bucket = s3.Bucket(aws_bucket_name)
for obj in bucket.objects.all():
key = obj.key
if key.endswith('.csv'):
file_contents = obj.get()['Body'].read()
df = pd.read_csv(BytesIO(file_contents), sep=';', encoding = 'UTF8')
# Example to add a unique ID column if missing in CSV
df['id'] = df['OrderNumber'].astype(str) + '_' + df['Date'].astype(str)
# Example converting date format (DD/MM/YYYY to YYYY-MM-DD)
df['Data'] = pd.to_datetime(df['Data'], format='%d/%m/%Y', dayfirst=True).dt.strftime('%Y-%m-%d')
# Replace empty cells (NaN) with empty string
df = df.fillna('')
rows = df.to_dict(orient='records')
# Write in batches, for large CSV files
batches = [rows[i:i+100] for i in range(0, len(rows), 100)]
for batch in batches:
result = dw.write(schema_name, table_name, batch, pk='id')