Logo
PythonVibeCoder
Published on

🔄 Building a Practical ETL Pipeline: Real-World Example with Customer Data 📊

🔄 Building a Practical ETL Pipeline: Real-World Example with Customer Data

Extract, Transform, Load (ETL) processes form the backbone of data engineering in organizations of all sizes. While the concept is straightforward, implementing an effective ETL pipeline in real-world scenarios involves numerous practical challenges. In this article, we'll walk through a complete, practical ETL example that you might encounter in a retail business.

🎯 The Business Scenario

Imagine you're a data engineer at RetailCo, a medium-sized retail company with both online and physical stores. The marketing team needs a unified customer view to better understand purchasing patterns and improve targeted promotions.

Currently, data exists in multiple systems:

  • Customer information in a MySQL database
  • Online purchases in JSON files from the e-commerce platform
  • In-store purchases in CSV files from point-of-sale systems

Your task is to build an ETL pipeline that consolidates this data into a PostgreSQL data warehouse for analysis.

📝 Requirements

The marketing team needs:

  1. A unified customer profile with demographics and contact information
  2. Complete purchase history across all channels
  3. Calculated metrics like average order value and purchase frequency
  4. Data refreshed daily for up-to-date analysis

Let's break down the ETL pipeline step by step.

🛠️ Step 1: Planning the Pipeline

Before writing any code, we need to design our data warehouse schema and identify the transformation rules:

# Target Schema Design (PostgreSQL)

# customers table
"""
CREATE TABLE customers (
    customer_id VARCHAR(20) PRIMARY KEY,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    email VARCHAR(100),
    phone VARCHAR(20),
    address TEXT,
    city VARCHAR(50),
    state VARCHAR(20),
    postal_code VARCHAR(20),
    country VARCHAR(50),
    customer_since DATE,
    source_system VARCHAR(20),
    last_updated TIMESTAMP
);
"""

# purchases table
"""
CREATE TABLE purchases (
    purchase_id VARCHAR(50) PRIMARY KEY,
    customer_id VARCHAR(20) REFERENCES customers(customer_id),
    purchase_date TIMESTAMP,
    store_id VARCHAR(20),
    channel VARCHAR(10),
    total_amount DECIMAL(10,2),
    payment_method VARCHAR(20),
    items JSONB,
    source_system VARCHAR(20),
    last_updated TIMESTAMP
);
"""

# customer_metrics table (derived)
"""
CREATE TABLE customer_metrics (
    customer_id VARCHAR(20) PRIMARY KEY REFERENCES customers(customer_id),
    total_purchases INT,
    total_spent DECIMAL(12,2),
    avg_order_value DECIMAL(10,2),
    first_purchase_date TIMESTAMP,
    last_purchase_date TIMESTAMP,
    favorite_store VARCHAR(20),
    favorite_category VARCHAR(50),
    purchase_frequency DECIMAL(10,2),
    last_updated TIMESTAMP
);
"""

📥 Step 2: Extracting Data from Sources

Next, we implement the extraction phase to pull data from each source:

import pandas as pd
import mysql.connector
import json
import os
from datetime import datetime, timedelta
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler("etl_log.log"), logging.StreamHandler()]
)
logger = logging.getLogger("RetailCo_ETL")

def extract_customer_data():
    """Extract customer data from MySQL database"""
    
    logger.info("Starting customer data extraction from MySQL")
    
    try:
        # Connect to MySQL database
        conn = mysql.connector.connect(
            host="customer-db.retailco.com",
            user="etl_user",
            password="secure_password",
            database="customers"
        )
        
        # Define extraction query - get only records updated since last ETL run
        last_etl_date = get_last_etl_date() or datetime.now() - timedelta(days=365*10)
        query = f"""
        SELECT 
            customer_id, 
            first_name, 
            last_name, 
            email, 
            phone, 
            address, 
            city, 
            state, 
            postal_code, 
            country, 
            date_registered AS customer_since, 
            'mysql_db' AS source_system, 
            last_updated
        FROM 
            customer_profiles
        WHERE 
            last_updated > '{last_etl_date.strftime('%Y-%m-%d %H:%M:%S')}'
        """
        
        # Execute query and fetch into DataFrame
        df_customers = pd.read_sql(query, conn)
        conn.close()
        
        logger.info(f"Successfully extracted {len(df_customers)} customer records")
        return df_customers
        
    except Exception as e:
        logger.error(f"Error extracting customer data: {str(e)}")
        raise

def extract_online_purchases():
    """Extract online purchase data from JSON files"""
    
    logger.info("Starting online purchase extraction from JSON files")
    
    try:
        # Path to the directory containing JSON files
        json_dir = "/data/online_purchases/"
        
        # Get list of files updated since last ETL run
        last_etl_date = get_last_etl_date() or datetime.now() - timedelta(days=1)
        
        purchase_data = []
        
        # Process each JSON file
        for filename in os.listdir(json_dir):
            if filename.endswith(".json"):
                file_path = os.path.join(json_dir, filename)
                file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
                
                # Only process files that were updated since last ETL run
                if file_modified > last_etl_date:
                    with open(file_path, "r") as f:
                        daily_purchases = json.load(f)
                        for purchase in daily_purchases:
                            purchase["source_system"] = "online_store"
                            purchase["channel"] = "online"
                            purchase_data.append(purchase)
        
        # Convert to DataFrame
        df_online = pd.DataFrame(purchase_data)
        
        logger.info(f"Successfully extracted {len(df_online)} online purchase records")
        return df_online
        
    except Exception as e:
        logger.error(f"Error extracting online purchase data: {str(e)}")
        raise

def extract_store_purchases():
    """Extract in-store purchase data from CSV files"""
    
    logger.info("Starting in-store purchase extraction from CSV files")
    
    try:
        # Path to the directory containing CSV files
        csv_dir = "/data/store_purchases/"
        
        # Get list of files updated since last ETL run
        last_etl_date = get_last_etl_date() or datetime.now() - timedelta(days=1)
        
        # Initialize an empty DataFrame
        df_store = pd.DataFrame()
        
        # Process each CSV file
        for filename in os.listdir(csv_dir):
            if filename.endswith(".csv"):
                file_path = os.path.join(csv_dir, filename)
                file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
                
                # Only process files that were updated since last ETL run
                if file_modified > last_etl_date:
                    # Read CSV file
                    df_daily = pd.read_csv(file_path)
                    
                    # Add source system and channel information
                    df_daily["source_system"] = "pos_system"
                    df_daily["channel"] = "store"
                    
                    # Append to the main DataFrame
                    df_store = pd.concat([df_store, df_daily])
        
        logger.info(f"Successfully extracted {len(df_store)} in-store purchase records")
        return df_store
        
    except Exception as e:
        logger.error(f"Error extracting in-store purchase data: {str(e)}")
        raise

def get_last_etl_date():
    """Get the timestamp of the last successful ETL run"""
    try:
        with open("last_etl_run.txt", "r") as f:
            date_str = f.read().strip()
            return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
    except (FileNotFoundError, ValueError):
        return None

⚙️ Step 3: Transforming the Data

Now comes the crucial transformation step, where we clean, standardize, and enrich the data:

def transform_customer_data(df_customers):
    """Transform and standardize customer data"""
    
    logger.info("Transforming customer data")
    
    try:
        # Make a copy of the input DataFrame
        df = df_customers.copy()
        
        # Clean and standardize columns
        
        # Handle missing values
        df["phone"] = df["phone"].fillna("")
        
        # Standardize email addresses (lowercase)
        df["email"] = df["email"].str.lower()
        
        # Standardize names (proper case)
        df["first_name"] = df["first_name"].str.title()
        df["last_name"] = df["last_name"].str.title()
        
        # Standardize postal codes
        df["postal_code"] = df["postal_code"].astype(str).str.strip()
        
        # Add last_updated timestamp if not present
        if "last_updated" not in df.columns:
            df["last_updated"] = datetime.now()
            
        logger.info(f"Customer data transformation complete: {len(df)} records processed")
        return df
        
    except Exception as e:
        logger.error(f"Error transforming customer data: {str(e)}")
        raise

def transform_online_purchases(df_online):
    """Transform online purchase data"""
    
    logger.info("Transforming online purchase data")
    
    try:
        # Make a copy of the input DataFrame
        df = df_online.copy()
        
        # Rename columns to match target schema
        df = df.rename(columns={
            "order_id": "purchase_id",
            "order_date": "purchase_date",
            "order_total": "total_amount",
            "products": "items"
        })
        
        # Convert purchase date to datetime
        df["purchase_date"] = pd.to_datetime(df["purchase_date"])
        
        # Convert items to JSON string if it's not already
        if not df["items"].apply(lambda x: isinstance(x, str)).all():
            df["items"] = df["items"].apply(json.dumps)
        
        # Add last_updated timestamp
        df["last_updated"] = datetime.now()
        
        logger.info(f"Online purchase data transformation complete: {len(df)} records processed")
        return df
        
    except Exception as e:
        logger.error(f"Error transforming online purchase data: {str(e)}")
        raise

def transform_store_purchases(df_store):
    """Transform in-store purchase data"""
    
    logger.info("Transforming in-store purchase data")
    
    try:
        # Make a copy of the input DataFrame
        df = df_store.copy()
        
        # Rename columns to match target schema
        df = df.rename(columns={
            "transaction_id": "purchase_id",
            "transaction_date": "purchase_date",
            "total": "total_amount",
            "payment_type": "payment_method",
            "product_list": "items"
        })
        
        # Convert purchase date to datetime
        df["purchase_date"] = pd.to_datetime(df["purchase_date"])
        
        # Convert items to proper JSON format
        df["items"] = df["items"].apply(lambda x: json.dumps(eval(x)) if isinstance(x, str) else json.dumps(x))
        
        # Add last_updated timestamp
        df["last_updated"] = datetime.now()
        
        logger.info(f"In-store purchase data transformation complete: {len(df)} records processed")
        return df
        
    except Exception as e:
        logger.error(f"Error transforming in-store purchase data: {str(e)}")
        raise

def create_customer_metrics(df_purchases):
    """Create derived customer metrics from purchase data"""
    
    logger.info("Generating customer metrics")
    
    try:
        # Group by customer_id
        grouped = df_purchases.groupby("customer_id")
        
        # Calculate metrics
        metrics = pd.DataFrame({
            "total_purchases": grouped.size(),
            "total_spent": grouped["total_amount"].sum(),
            "first_purchase_date": grouped["purchase_date"].min(),
            "last_purchase_date": grouped["purchase_date"].max(),
        })
        
        # Calculate average order value
        metrics["avg_order_value"] = metrics["total_spent"] / metrics["total_purchases"]
        
        # Reset index to make customer_id a column
        metrics = metrics.reset_index()
        
        # Calculate purchase frequency (days between first and last purchase / number of purchases)
        metrics["purchase_frequency"] = metrics.apply(
            lambda x: (x["last_purchase_date"] - x["first_purchase_date"]).days / x["total_purchases"] 
            if x["total_purchases"] > 1 else 0, 
            axis=1
        )
        
        # Find favorite store
        store_counts = df_purchases.groupby(["customer_id", "store_id"]).size().reset_index(name="count")
        favorite_stores = store_counts.loc[store_counts.groupby("customer_id")["count"].idxmax()]
        favorite_stores = favorite_stores[["customer_id", "store_id"]].rename(columns={"store_id": "favorite_store"})
        
        # Merge favorite store info
        metrics = pd.merge(metrics, favorite_stores, on="customer_id", how="left")
        
        # Extract favorite category from items
        def get_favorite_category(customer_id):
            customer_purchases = df_purchases[df_purchases["customer_id"] == customer_id]
            categories = []
            
            for items_json in customer_purchases["items"]:
                items = json.loads(items_json)
                for item in items:
                    if "category" in item:
                        categories.append(item["category"])
            
            if categories:
                from collections import Counter
                return Counter(categories).most_common(1)[0][0]
            return None
        
        # Apply to each customer
        metrics["favorite_category"] = metrics["customer_id"].apply(get_favorite_category)
        
        # Add last_updated timestamp
        metrics["last_updated"] = datetime.now()
        
        logger.info(f"Customer metrics generated: {len(metrics)} records processed")
        return metrics
        
    except Exception as e:
        logger.error(f"Error generating customer metrics: {str(e)}")
        raise

📤 Step 4: Loading the Transformed Data

Finally, we load the processed data into our PostgreSQL data warehouse:

from sqlalchemy import create_engine
import psycopg2

def load_data_to_warehouse(df_customers, df_purchases, df_metrics):
    """Load transformed data into PostgreSQL data warehouse"""
    
    logger.info("Starting data load to warehouse")
    
    try:
        # Create SQLAlchemy engine for PostgreSQL connection
        engine = create_engine(
            "postgresql://etl_user:secure_password@warehouse.retailco.com:5432/retail_dw"
        )
        
        # Load customers table - update existing records and insert new ones
        logger.info("Loading customer data...")
        
        # Get existing customer IDs
        existing_customers = pd.read_sql("SELECT customer_id FROM customers", engine)
        existing_ids = set(existing_customers["customer_id"])
        
        # Split DataFrame into new and existing customers
        df_new_customers = df_customers[~df_customers["customer_id"].isin(existing_ids)]
        df_existing_customers = df_customers[df_customers["customer_id"].isin(existing_ids)]
        
        # Insert new customers
        if not df_new_customers.empty:
            df_new_customers.to_sql("customers", engine, if_exists="append", index=False)
            logger.info(f"Inserted {len(df_new_customers)} new customers")
        
        # Update existing customers
        if not df_existing_customers.empty:
            connection = engine.raw_connection()
            cursor = connection.cursor()
            
            # Prepare temporary table for the update
            df_existing_customers.to_sql("customers_temp", engine, if_exists="replace", index=False)
            
            # Update existing records
            update_query = """
            UPDATE customers c
            SET 
                first_name = ct.first_name,
                last_name = ct.last_name,
                email = ct.email,
                phone = ct.phone,
                address = ct.address,
                city = ct.city,
                state = ct.state,
                postal_code = ct.postal_code,
                country = ct.country,
                last_updated = ct.last_updated
            FROM 
                customers_temp ct
            WHERE 
                c.customer_id = ct.customer_id
            """
            
            cursor.execute(update_query)
            connection.commit()
            cursor.close()
            connection.close()
            
            logger.info(f"Updated {len(df_existing_customers)} existing customers")
        
        # Load purchases table
        logger.info("Loading purchase data...")
        
        # Get existing purchase IDs
        existing_purchases = pd.read_sql("SELECT purchase_id FROM purchases", engine)
        existing_purchase_ids = set(existing_purchases["purchase_id"])
        
        # Filter out already loaded purchases
        new_purchases = df_purchases[~df_purchases["purchase_id"].isin(existing_purchase_ids)]
        
        if not new_purchases.empty:
            new_purchases.to_sql("purchases", engine, if_exists="append", index=False)
            logger.info(f"Inserted {len(new_purchases)} new purchases")
        else:
            logger.info("No new purchases to load")
        
        # Load customer metrics table (replace approach)
        logger.info("Loading customer metrics...")
        df_metrics.to_sql("customer_metrics", engine, if_exists="replace", index=False)
        logger.info(f"Updated metrics for {len(df_metrics)} customers")
        
        # Update ETL timestamp
        with open("last_etl_run.txt", "w") as f:
            f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        
        logger.info("Data warehouse load completed successfully")
        return True
        
    except Exception as e:
        logger.error(f"Error loading data to warehouse: {str(e)}")
        raise

🔄 Step 5: Putting It All Together

Now we combine all the components into a complete ETL pipeline:

def run_etl_pipeline():
    """Execute the full ETL pipeline"""
    
    logger.info("Starting RetailCo ETL pipeline")
    start_time = datetime.now()
    
    try:
        # Extract data from sources
        customers_raw = extract_customer_data()
        online_purchases_raw = extract_online_purchases()
        store_purchases_raw = extract_store_purchases()
        
        # Skip further processing if no data extracted
        if customers_raw.empty and online_purchases_raw.empty and store_purchases_raw.empty:
            logger.info("No new data to process. ETL pipeline completed.")
            return
        
        # Transform data
        customers_transformed = transform_customer_data(customers_raw)
        online_purchases_transformed = transform_online_purchases(online_purchases_raw)
        store_purchases_transformed = transform_store_purchases(store_purchases_raw)
        
        # Combine all purchase data
        all_purchases = pd.concat([online_purchases_transformed, store_purchases_transformed])
        
        # Generate customer metrics
        customer_metrics = create_customer_metrics(all_purchases)
        
        # Load data to warehouse
        load_data_to_warehouse(
            customers_transformed, 
            all_purchases, 
            customer_metrics
        )
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        logger.info(f"ETL pipeline completed successfully in {duration:.2f} seconds")
        
    except Exception as e:
        logger.error(f"ETL pipeline failed: {str(e)}")
        # Send alert if needed
        send_etl_failure_alert(str(e))
        raise

def send_etl_failure_alert(error_message):
    """Send an alert when ETL pipeline fails"""
    # Implementation depends on your alerting system
    # Could be email, Slack notification, etc.
    logger.info(f"Alert sent: ETL pipeline failure - {error_message}")

if __name__ == "__main__":
    run_etl_pipeline()

📊 Step 6: Scheduling and Monitoring

To complete our real-world implementation, we need to schedule the ETL pipeline and set up monitoring:

# Example cron job setup (Linux/Unix):
# 0 2 * * * /usr/bin/python3 /path/to/retail_etl.py > /path/to/etl_cron.log 2>&1

# For more sophisticated scheduling, consider using Airflow:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_engineering',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email': ['data_alerts@retailco.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'retail_customer_etl',
    default_args=default_args,
    description='ETL pipeline for RetailCo customer data',
    schedule_interval='0 2 * * *',  # Run at 2 AM daily
    catchup=False
)

etl_task = PythonOperator(
    task_id='run_retail_etl',
    python_callable=run_etl_pipeline,
    dag=dag,
)

🚀 Real-World Considerations

This example demonstrates a practical ETL pipeline, but real-world implementations often involve additional considerations:

1. Error Handling and Data Quality

In production, you'd want more robust error handling and data quality checks:

def validate_purchase_data(df):
    """Validate purchase data quality"""
    
    # Check for required columns
    required_cols = ['purchase_id', 'customer_id', 'purchase_date', 'total_amount']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")
    
    # Check for duplicate purchase IDs
    duplicate_purchases = df['purchase_id'].duplicated().sum()
    if duplicate_purchases > 0:
        logger.warning(f"Found {duplicate_purchases} duplicate purchase IDs")
        # Remove duplicates
        df = df.drop_duplicates(subset=['purchase_id'])
    
    # Validate numeric columns
    if not pd.api.types.is_numeric_dtype(df['total_amount']):
        logger.warning("Converting total_amount to numeric")
        df['total_amount'] = pd.to_numeric(df['total_amount'], errors='coerce')
    
    # Check for missing values in critical columns
    missing_values = df[required_cols].isnull().sum().sum()
    if missing_values > 0:
        logger.warning(f"Found {missing_values} missing values in critical columns")
    
    return df

2. Incremental Processing

For large datasets, you'll want to implement proper incremental processing:

def get_incremental_extract_query(table, last_etl_date, date_column="last_updated"):
    """Generate a query for incremental data extraction"""
    
    query = f"""
    SELECT *
    FROM {table}
    WHERE {date_column} > '{last_etl_date.strftime('%Y-%m-%d %H:%M:%S')}'
    """
    
    return query

3. Performance Optimization

Processing large datasets might require chunking:

def process_in_chunks(extract_func, transform_func, chunk_size=50000):
    """Process large datasets in chunks"""
    
    offset = 0
    more_data = True
    results = []
    
    while more_data:
        # Extract chunk
        chunk = extract_func(offset, chunk_size)
        
        # If no more data, exit loop
        if len(chunk) == 0:
            more_data = False
            continue
        
        # Transform chunk
        transformed_chunk = transform_func(chunk)
        
        # Add to results
        results.append(transformed_chunk)
        
        # Increment offset
        offset += chunk_size
    
    # Combine results if needed
    if results:
        return pd.concat(results)
    return pd.DataFrame()

4. Handling Data Anomalies

Real data often contains anomalies that need special handling:

def handle_currency_conversion(df):
    """Convert different currencies to a standard currency"""
    
    # Define conversion rates
    conversion_rates = {
        'USD': 1.0,
        'EUR': 1.1,
        'GBP': 1.3,
        'CAD': 0.75
    }
    
    # Apply conversion to standardize on USD
    df['total_amount_usd'] = df.apply(
        lambda row: row['total_amount'] * conversion_rates.get(row['currency'], 1.0),
        axis=1
    )
    
    return df

🎓 Lessons Learned

This practical ETL example demonstrates several important principles:

  1. Source Diversity: Real-world ETL often involves multiple data sources with different formats and structures.

  2. Incremental Loading: Processing only new or changed data is essential for efficiency.

  3. Error Handling: Robust error handling and logging are critical for production pipelines.

  4. Data Quality: Validation and cleaning are necessary to handle real-world data inconsistencies.

  5. Scalability: Design your pipeline to handle increasing data volumes through techniques like chunking.

  6. Monitoring: Proper logging and alerting ensure you know when issues occur.

🏁 Conclusion

Building a practical ETL pipeline requires careful consideration of real-world complexities. This example demonstrates how to extract customer and purchase data from multiple sources, transform it to create a unified view, and load it into a data warehouse for analysis.

By following a structured approach and addressing common challenges like data quality, incremental processing, and performance optimization, you can create reliable ETL pipelines that deliver valuable business insights.

Remember that ETL is not a one-size-fits-all process. Each implementation should be tailored to your specific data sources, business requirements, and technical constraints. The patterns and techniques shown in this example provide a solid foundation that you can adapt to your own ETL challenges.

Tags