- Published on
🔄 Building a Practical ETL Pipeline: Real-World Example with Customer Data 📊
🔄 Building a Practical ETL Pipeline: Real-World Example with Customer Data
Extract, Transform, Load (ETL) processes form the backbone of data engineering in organizations of all sizes. While the concept is straightforward, implementing an effective ETL pipeline in real-world scenarios involves numerous practical challenges. In this article, we'll walk through a complete, practical ETL example that you might encounter in a retail business.
🎯 The Business Scenario
Imagine you're a data engineer at RetailCo, a medium-sized retail company with both online and physical stores. The marketing team needs a unified customer view to better understand purchasing patterns and improve targeted promotions.
Currently, data exists in multiple systems:
- Customer information in a MySQL database
- Online purchases in JSON files from the e-commerce platform
- In-store purchases in CSV files from point-of-sale systems
Your task is to build an ETL pipeline that consolidates this data into a PostgreSQL data warehouse for analysis.
📝 Requirements
The marketing team needs:
- A unified customer profile with demographics and contact information
- Complete purchase history across all channels
- Calculated metrics like average order value and purchase frequency
- Data refreshed daily for up-to-date analysis
Let's break down the ETL pipeline step by step.
🛠️ Step 1: Planning the Pipeline
Before writing any code, we need to design our data warehouse schema and identify the transformation rules:
# Target Schema Design (PostgreSQL)
# customers table
"""
CREATE TABLE customers (
customer_id VARCHAR(20) PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
address TEXT,
city VARCHAR(50),
state VARCHAR(20),
postal_code VARCHAR(20),
country VARCHAR(50),
customer_since DATE,
source_system VARCHAR(20),
last_updated TIMESTAMP
);
"""
# purchases table
"""
CREATE TABLE purchases (
purchase_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(20) REFERENCES customers(customer_id),
purchase_date TIMESTAMP,
store_id VARCHAR(20),
channel VARCHAR(10),
total_amount DECIMAL(10,2),
payment_method VARCHAR(20),
items JSONB,
source_system VARCHAR(20),
last_updated TIMESTAMP
);
"""
# customer_metrics table (derived)
"""
CREATE TABLE customer_metrics (
customer_id VARCHAR(20) PRIMARY KEY REFERENCES customers(customer_id),
total_purchases INT,
total_spent DECIMAL(12,2),
avg_order_value DECIMAL(10,2),
first_purchase_date TIMESTAMP,
last_purchase_date TIMESTAMP,
favorite_store VARCHAR(20),
favorite_category VARCHAR(50),
purchase_frequency DECIMAL(10,2),
last_updated TIMESTAMP
);
"""
📥 Step 2: Extracting Data from Sources
Next, we implement the extraction phase to pull data from each source:
import pandas as pd
import mysql.connector
import json
import os
from datetime import datetime, timedelta
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("etl_log.log"), logging.StreamHandler()]
)
logger = logging.getLogger("RetailCo_ETL")
def extract_customer_data():
"""Extract customer data from MySQL database"""
logger.info("Starting customer data extraction from MySQL")
try:
# Connect to MySQL database
conn = mysql.connector.connect(
host="customer-db.retailco.com",
user="etl_user",
password="secure_password",
database="customers"
)
# Define extraction query - get only records updated since last ETL run
last_etl_date = get_last_etl_date() or datetime.now() - timedelta(days=365*10)
query = f"""
SELECT
customer_id,
first_name,
last_name,
email,
phone,
address,
city,
state,
postal_code,
country,
date_registered AS customer_since,
'mysql_db' AS source_system,
last_updated
FROM
customer_profiles
WHERE
last_updated > '{last_etl_date.strftime('%Y-%m-%d %H:%M:%S')}'
"""
# Execute query and fetch into DataFrame
df_customers = pd.read_sql(query, conn)
conn.close()
logger.info(f"Successfully extracted {len(df_customers)} customer records")
return df_customers
except Exception as e:
logger.error(f"Error extracting customer data: {str(e)}")
raise
def extract_online_purchases():
"""Extract online purchase data from JSON files"""
logger.info("Starting online purchase extraction from JSON files")
try:
# Path to the directory containing JSON files
json_dir = "/data/online_purchases/"
# Get list of files updated since last ETL run
last_etl_date = get_last_etl_date() or datetime.now() - timedelta(days=1)
purchase_data = []
# Process each JSON file
for filename in os.listdir(json_dir):
if filename.endswith(".json"):
file_path = os.path.join(json_dir, filename)
file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
# Only process files that were updated since last ETL run
if file_modified > last_etl_date:
with open(file_path, "r") as f:
daily_purchases = json.load(f)
for purchase in daily_purchases:
purchase["source_system"] = "online_store"
purchase["channel"] = "online"
purchase_data.append(purchase)
# Convert to DataFrame
df_online = pd.DataFrame(purchase_data)
logger.info(f"Successfully extracted {len(df_online)} online purchase records")
return df_online
except Exception as e:
logger.error(f"Error extracting online purchase data: {str(e)}")
raise
def extract_store_purchases():
"""Extract in-store purchase data from CSV files"""
logger.info("Starting in-store purchase extraction from CSV files")
try:
# Path to the directory containing CSV files
csv_dir = "/data/store_purchases/"
# Get list of files updated since last ETL run
last_etl_date = get_last_etl_date() or datetime.now() - timedelta(days=1)
# Initialize an empty DataFrame
df_store = pd.DataFrame()
# Process each CSV file
for filename in os.listdir(csv_dir):
if filename.endswith(".csv"):
file_path = os.path.join(csv_dir, filename)
file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
# Only process files that were updated since last ETL run
if file_modified > last_etl_date:
# Read CSV file
df_daily = pd.read_csv(file_path)
# Add source system and channel information
df_daily["source_system"] = "pos_system"
df_daily["channel"] = "store"
# Append to the main DataFrame
df_store = pd.concat([df_store, df_daily])
logger.info(f"Successfully extracted {len(df_store)} in-store purchase records")
return df_store
except Exception as e:
logger.error(f"Error extracting in-store purchase data: {str(e)}")
raise
def get_last_etl_date():
"""Get the timestamp of the last successful ETL run"""
try:
with open("last_etl_run.txt", "r") as f:
date_str = f.read().strip()
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
except (FileNotFoundError, ValueError):
return None
⚙️ Step 3: Transforming the Data
Now comes the crucial transformation step, where we clean, standardize, and enrich the data:
def transform_customer_data(df_customers):
"""Transform and standardize customer data"""
logger.info("Transforming customer data")
try:
# Make a copy of the input DataFrame
df = df_customers.copy()
# Clean and standardize columns
# Handle missing values
df["phone"] = df["phone"].fillna("")
# Standardize email addresses (lowercase)
df["email"] = df["email"].str.lower()
# Standardize names (proper case)
df["first_name"] = df["first_name"].str.title()
df["last_name"] = df["last_name"].str.title()
# Standardize postal codes
df["postal_code"] = df["postal_code"].astype(str).str.strip()
# Add last_updated timestamp if not present
if "last_updated" not in df.columns:
df["last_updated"] = datetime.now()
logger.info(f"Customer data transformation complete: {len(df)} records processed")
return df
except Exception as e:
logger.error(f"Error transforming customer data: {str(e)}")
raise
def transform_online_purchases(df_online):
"""Transform online purchase data"""
logger.info("Transforming online purchase data")
try:
# Make a copy of the input DataFrame
df = df_online.copy()
# Rename columns to match target schema
df = df.rename(columns={
"order_id": "purchase_id",
"order_date": "purchase_date",
"order_total": "total_amount",
"products": "items"
})
# Convert purchase date to datetime
df["purchase_date"] = pd.to_datetime(df["purchase_date"])
# Convert items to JSON string if it's not already
if not df["items"].apply(lambda x: isinstance(x, str)).all():
df["items"] = df["items"].apply(json.dumps)
# Add last_updated timestamp
df["last_updated"] = datetime.now()
logger.info(f"Online purchase data transformation complete: {len(df)} records processed")
return df
except Exception as e:
logger.error(f"Error transforming online purchase data: {str(e)}")
raise
def transform_store_purchases(df_store):
"""Transform in-store purchase data"""
logger.info("Transforming in-store purchase data")
try:
# Make a copy of the input DataFrame
df = df_store.copy()
# Rename columns to match target schema
df = df.rename(columns={
"transaction_id": "purchase_id",
"transaction_date": "purchase_date",
"total": "total_amount",
"payment_type": "payment_method",
"product_list": "items"
})
# Convert purchase date to datetime
df["purchase_date"] = pd.to_datetime(df["purchase_date"])
# Convert items to proper JSON format
df["items"] = df["items"].apply(lambda x: json.dumps(eval(x)) if isinstance(x, str) else json.dumps(x))
# Add last_updated timestamp
df["last_updated"] = datetime.now()
logger.info(f"In-store purchase data transformation complete: {len(df)} records processed")
return df
except Exception as e:
logger.error(f"Error transforming in-store purchase data: {str(e)}")
raise
def create_customer_metrics(df_purchases):
"""Create derived customer metrics from purchase data"""
logger.info("Generating customer metrics")
try:
# Group by customer_id
grouped = df_purchases.groupby("customer_id")
# Calculate metrics
metrics = pd.DataFrame({
"total_purchases": grouped.size(),
"total_spent": grouped["total_amount"].sum(),
"first_purchase_date": grouped["purchase_date"].min(),
"last_purchase_date": grouped["purchase_date"].max(),
})
# Calculate average order value
metrics["avg_order_value"] = metrics["total_spent"] / metrics["total_purchases"]
# Reset index to make customer_id a column
metrics = metrics.reset_index()
# Calculate purchase frequency (days between first and last purchase / number of purchases)
metrics["purchase_frequency"] = metrics.apply(
lambda x: (x["last_purchase_date"] - x["first_purchase_date"]).days / x["total_purchases"]
if x["total_purchases"] > 1 else 0,
axis=1
)
# Find favorite store
store_counts = df_purchases.groupby(["customer_id", "store_id"]).size().reset_index(name="count")
favorite_stores = store_counts.loc[store_counts.groupby("customer_id")["count"].idxmax()]
favorite_stores = favorite_stores[["customer_id", "store_id"]].rename(columns={"store_id": "favorite_store"})
# Merge favorite store info
metrics = pd.merge(metrics, favorite_stores, on="customer_id", how="left")
# Extract favorite category from items
def get_favorite_category(customer_id):
customer_purchases = df_purchases[df_purchases["customer_id"] == customer_id]
categories = []
for items_json in customer_purchases["items"]:
items = json.loads(items_json)
for item in items:
if "category" in item:
categories.append(item["category"])
if categories:
from collections import Counter
return Counter(categories).most_common(1)[0][0]
return None
# Apply to each customer
metrics["favorite_category"] = metrics["customer_id"].apply(get_favorite_category)
# Add last_updated timestamp
metrics["last_updated"] = datetime.now()
logger.info(f"Customer metrics generated: {len(metrics)} records processed")
return metrics
except Exception as e:
logger.error(f"Error generating customer metrics: {str(e)}")
raise
📤 Step 4: Loading the Transformed Data
Finally, we load the processed data into our PostgreSQL data warehouse:
from sqlalchemy import create_engine
import psycopg2
def load_data_to_warehouse(df_customers, df_purchases, df_metrics):
"""Load transformed data into PostgreSQL data warehouse"""
logger.info("Starting data load to warehouse")
try:
# Create SQLAlchemy engine for PostgreSQL connection
engine = create_engine(
"postgresql://etl_user:secure_password@warehouse.retailco.com:5432/retail_dw"
)
# Load customers table - update existing records and insert new ones
logger.info("Loading customer data...")
# Get existing customer IDs
existing_customers = pd.read_sql("SELECT customer_id FROM customers", engine)
existing_ids = set(existing_customers["customer_id"])
# Split DataFrame into new and existing customers
df_new_customers = df_customers[~df_customers["customer_id"].isin(existing_ids)]
df_existing_customers = df_customers[df_customers["customer_id"].isin(existing_ids)]
# Insert new customers
if not df_new_customers.empty:
df_new_customers.to_sql("customers", engine, if_exists="append", index=False)
logger.info(f"Inserted {len(df_new_customers)} new customers")
# Update existing customers
if not df_existing_customers.empty:
connection = engine.raw_connection()
cursor = connection.cursor()
# Prepare temporary table for the update
df_existing_customers.to_sql("customers_temp", engine, if_exists="replace", index=False)
# Update existing records
update_query = """
UPDATE customers c
SET
first_name = ct.first_name,
last_name = ct.last_name,
email = ct.email,
phone = ct.phone,
address = ct.address,
city = ct.city,
state = ct.state,
postal_code = ct.postal_code,
country = ct.country,
last_updated = ct.last_updated
FROM
customers_temp ct
WHERE
c.customer_id = ct.customer_id
"""
cursor.execute(update_query)
connection.commit()
cursor.close()
connection.close()
logger.info(f"Updated {len(df_existing_customers)} existing customers")
# Load purchases table
logger.info("Loading purchase data...")
# Get existing purchase IDs
existing_purchases = pd.read_sql("SELECT purchase_id FROM purchases", engine)
existing_purchase_ids = set(existing_purchases["purchase_id"])
# Filter out already loaded purchases
new_purchases = df_purchases[~df_purchases["purchase_id"].isin(existing_purchase_ids)]
if not new_purchases.empty:
new_purchases.to_sql("purchases", engine, if_exists="append", index=False)
logger.info(f"Inserted {len(new_purchases)} new purchases")
else:
logger.info("No new purchases to load")
# Load customer metrics table (replace approach)
logger.info("Loading customer metrics...")
df_metrics.to_sql("customer_metrics", engine, if_exists="replace", index=False)
logger.info(f"Updated metrics for {len(df_metrics)} customers")
# Update ETL timestamp
with open("last_etl_run.txt", "w") as f:
f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
logger.info("Data warehouse load completed successfully")
return True
except Exception as e:
logger.error(f"Error loading data to warehouse: {str(e)}")
raise
🔄 Step 5: Putting It All Together
Now we combine all the components into a complete ETL pipeline:
def run_etl_pipeline():
"""Execute the full ETL pipeline"""
logger.info("Starting RetailCo ETL pipeline")
start_time = datetime.now()
try:
# Extract data from sources
customers_raw = extract_customer_data()
online_purchases_raw = extract_online_purchases()
store_purchases_raw = extract_store_purchases()
# Skip further processing if no data extracted
if customers_raw.empty and online_purchases_raw.empty and store_purchases_raw.empty:
logger.info("No new data to process. ETL pipeline completed.")
return
# Transform data
customers_transformed = transform_customer_data(customers_raw)
online_purchases_transformed = transform_online_purchases(online_purchases_raw)
store_purchases_transformed = transform_store_purchases(store_purchases_raw)
# Combine all purchase data
all_purchases = pd.concat([online_purchases_transformed, store_purchases_transformed])
# Generate customer metrics
customer_metrics = create_customer_metrics(all_purchases)
# Load data to warehouse
load_data_to_warehouse(
customers_transformed,
all_purchases,
customer_metrics
)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logger.info(f"ETL pipeline completed successfully in {duration:.2f} seconds")
except Exception as e:
logger.error(f"ETL pipeline failed: {str(e)}")
# Send alert if needed
send_etl_failure_alert(str(e))
raise
def send_etl_failure_alert(error_message):
"""Send an alert when ETL pipeline fails"""
# Implementation depends on your alerting system
# Could be email, Slack notification, etc.
logger.info(f"Alert sent: ETL pipeline failure - {error_message}")
if __name__ == "__main__":
run_etl_pipeline()
📊 Step 6: Scheduling and Monitoring
To complete our real-world implementation, we need to schedule the ETL pipeline and set up monitoring:
# Example cron job setup (Linux/Unix):
# 0 2 * * * /usr/bin/python3 /path/to/retail_etl.py > /path/to/etl_cron.log 2>&1
# For more sophisticated scheduling, consider using Airflow:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_engineering',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email': ['data_alerts@retailco.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'retail_customer_etl',
default_args=default_args,
description='ETL pipeline for RetailCo customer data',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False
)
etl_task = PythonOperator(
task_id='run_retail_etl',
python_callable=run_etl_pipeline,
dag=dag,
)
🚀 Real-World Considerations
This example demonstrates a practical ETL pipeline, but real-world implementations often involve additional considerations:
1. Error Handling and Data Quality
In production, you'd want more robust error handling and data quality checks:
def validate_purchase_data(df):
"""Validate purchase data quality"""
# Check for required columns
required_cols = ['purchase_id', 'customer_id', 'purchase_date', 'total_amount']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Check for duplicate purchase IDs
duplicate_purchases = df['purchase_id'].duplicated().sum()
if duplicate_purchases > 0:
logger.warning(f"Found {duplicate_purchases} duplicate purchase IDs")
# Remove duplicates
df = df.drop_duplicates(subset=['purchase_id'])
# Validate numeric columns
if not pd.api.types.is_numeric_dtype(df['total_amount']):
logger.warning("Converting total_amount to numeric")
df['total_amount'] = pd.to_numeric(df['total_amount'], errors='coerce')
# Check for missing values in critical columns
missing_values = df[required_cols].isnull().sum().sum()
if missing_values > 0:
logger.warning(f"Found {missing_values} missing values in critical columns")
return df
2. Incremental Processing
For large datasets, you'll want to implement proper incremental processing:
def get_incremental_extract_query(table, last_etl_date, date_column="last_updated"):
"""Generate a query for incremental data extraction"""
query = f"""
SELECT *
FROM {table}
WHERE {date_column} > '{last_etl_date.strftime('%Y-%m-%d %H:%M:%S')}'
"""
return query
3. Performance Optimization
Processing large datasets might require chunking:
def process_in_chunks(extract_func, transform_func, chunk_size=50000):
"""Process large datasets in chunks"""
offset = 0
more_data = True
results = []
while more_data:
# Extract chunk
chunk = extract_func(offset, chunk_size)
# If no more data, exit loop
if len(chunk) == 0:
more_data = False
continue
# Transform chunk
transformed_chunk = transform_func(chunk)
# Add to results
results.append(transformed_chunk)
# Increment offset
offset += chunk_size
# Combine results if needed
if results:
return pd.concat(results)
return pd.DataFrame()
4. Handling Data Anomalies
Real data often contains anomalies that need special handling:
def handle_currency_conversion(df):
"""Convert different currencies to a standard currency"""
# Define conversion rates
conversion_rates = {
'USD': 1.0,
'EUR': 1.1,
'GBP': 1.3,
'CAD': 0.75
}
# Apply conversion to standardize on USD
df['total_amount_usd'] = df.apply(
lambda row: row['total_amount'] * conversion_rates.get(row['currency'], 1.0),
axis=1
)
return df
🎓 Lessons Learned
This practical ETL example demonstrates several important principles:
Source Diversity: Real-world ETL often involves multiple data sources with different formats and structures.
Incremental Loading: Processing only new or changed data is essential for efficiency.
Error Handling: Robust error handling and logging are critical for production pipelines.
Data Quality: Validation and cleaning are necessary to handle real-world data inconsistencies.
Scalability: Design your pipeline to handle increasing data volumes through techniques like chunking.
Monitoring: Proper logging and alerting ensure you know when issues occur.
🏁 Conclusion
Building a practical ETL pipeline requires careful consideration of real-world complexities. This example demonstrates how to extract customer and purchase data from multiple sources, transform it to create a unified view, and load it into a data warehouse for analysis.
By following a structured approach and addressing common challenges like data quality, incremental processing, and performance optimization, you can create reliable ETL pipelines that deliver valuable business insights.
Remember that ETL is not a one-size-fits-all process. Each implementation should be tailored to your specific data sources, business requirements, and technical constraints. The patterns and techniques shown in this example provide a solid foundation that you can adapt to your own ETL challenges.