Logo
PythonVibeCoder
Published on

🔄 Building ETL Pipelines for CSV Files in Python 📊

🔄 Building ETL Pipelines for CSV Files in Python

CSV (Comma-Separated Values) files remain one of the most common formats for data exchange across industries. Whether you're working with customer data, sales transactions, or IoT sensor readings, you'll likely encounter CSV files. In this guide, we'll explore how to build efficient ETL (Extract, Transform, Load) pipelines specifically designed for processing CSV data.

📥 The Extract Phase: Reading CSV Files

The extraction phase involves reading data from source CSV files. Python offers several approaches to do this effectively.

Using Pandas

Pandas is often the go-to library for CSV processing due to its powerful DataFrame structure:

import pandas as pd

# Basic CSV reading
df = pd.read_csv('data.csv')

# With additional options
df = pd.read_csv(
    'data.csv',
    delimiter=',',           # Specify delimiter (comma is default)
    encoding='utf-8',        # Handle character encoding
    skiprows=1,              # Skip header row if needed
    usecols=['id', 'name'],  # Read only specific columns
    na_values=['NA', 'N/A'], # Define what should be treated as NaN
    dtype={'id': int}        # Specify data types
)

💡 Tip: Always check the first few rows of your CSV before processing the entire file with pd.read_csv('file.csv', nrows=5) to verify column names and data types.

For Large Files: Chunk Processing

When dealing with CSV files too large to fit in memory, use chunking:

# Process CSV in chunks
chunk_size = 10000
chunks = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # Process each chunk
    processed_chunk = transform_data(chunk)
    chunks.append(processed_chunk)
    
# Combine processed chunks (if needed)
result = pd.concat(chunks, ignore_index=True)

Alternative: Using CSV Module

For more control or when Pandas isn't suitable:

import csv

with open('data.csv', 'r', encoding='utf-8') as file:
    csv_reader = csv.DictReader(file)
    
    for row in csv_reader:
        # Process each row as a dictionary
        print(f"ID: {row['id']}, Name: {row['name']}")

⚙️ The Transform Phase: Processing CSV Data

Transformation is where you convert your raw CSV data into a format suitable for analysis or storage.

Common CSV Transformations

Here are some typical transformations for CSV data:

def transform_csv_data(df):
    # Data cleaning
    df = df.dropna(subset=['required_column'])  # Remove rows with missing values
    
    # Data type conversion
    df['date'] = pd.to_datetime(df['date'])
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
    
    # Feature engineering
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    
    # Renaming columns
    df = df.rename(columns={'old_name': 'new_name'})
    
    # Filtering data
    df = df[df['amount'] > 0]
    
    return df

Data Validation

Always validate your CSV data before processing:

def validate_csv_data(df):
    # Check for required columns
    required_cols = ['id', 'date', 'amount']
    missing_cols = [col for col in required_cols if col not in df.columns]
    
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")
    
    # Validate data types
    if not pd.api.types.is_numeric_dtype(df['amount']):
        raise TypeError("Column 'amount' must contain numeric values")
    
    # Check for duplicates
    duplicates = df['id'].duplicated().sum()
    if duplicates > 0:
        print(f"Warning: Found {duplicates} duplicate IDs")
    
    return True

📤 The Load Phase: Storing Processed Data

After transformation, the final step is loading your processed data into a destination system.

Writing to a New CSV

The simplest approach is writing to a new CSV file:

# Export to CSV
processed_df.to_csv(
    'processed_data.csv',
    index=False,           # Don't write row indices
    encoding='utf-8',      # Specify encoding
    date_format='%Y-%m-%d' # Format date columns
)

Loading to a Database

For more robust storage, load to a database:

from sqlalchemy import create_engine

# Create database connection
engine = create_engine('postgresql://username:password@localhost:5432/database')

# Write DataFrame to SQL table
df.to_sql(
    'table_name',
    engine,
    if_exists='replace',  # Options: fail, replace, append
    index=False,
    dtype={               # Specify SQL column types if needed
        'id': Integer,
        'name': String(100)
    }
)

🔄 Building a Complete ETL Pipeline

Now let's put everything together into a reusable ETL pipeline:

class CSVETLPipeline:
    def __init__(self, input_file, output_file=None, db_connection=None):
        self.input_file = input_file
        self.output_file = output_file
        self.db_connection = db_connection
        self.logger = self._setup_logging()
    
    def _setup_logging(self):
        # Configure logging
        import logging
        logger = logging.getLogger("CSV_ETL")
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    def extract(self):
        """Extract data from CSV file"""
        self.logger.info(f"Extracting data from {self.input_file}")
        try:
            df = pd.read_csv(self.input_file)
            self.logger.info(f"Successfully extracted {len(df)} rows")
            return df
        except Exception as e:
            self.logger.error(f"Extraction failed: {str(e)}")
            raise
    
    def transform(self, df):
        """Apply transformations to the data"""
        self.logger.info("Transforming data")
        try:
            # Data cleaning
            df = df.dropna(subset=['id'])
            
            # Apply transformations (customize as needed)
            for col in df.select_dtypes(include=['object']).columns:
                df[col] = df[col].str.strip()
            
            # Data type conversions
            if 'date' in df.columns:
                df['date'] = pd.to_datetime(df['date'], errors='coerce')
            
            # More transformations...
            
            self.logger.info(f"Transformation complete, {len(df)} rows remaining")
            return df
        except Exception as e:
            self.logger.error(f"Transformation failed: {str(e)}")
            raise
    
    def load(self, df):
        """Load data to destination"""
        if self.output_file:
            self.logger.info(f"Loading data to CSV: {self.output_file}")
            df.to_csv(self.output_file, index=False)
            
        if self.db_connection:
            self.logger.info("Loading data to database")
            table_name = self.output_file.split('.')[0] if self.output_file else "processed_data"
            df.to_sql(table_name, self.db_connection, if_exists='replace', index=False)
            
        self.logger.info(f"Successfully loaded {len(df)} rows")
        return True
    
    def run(self):
        """Execute the complete ETL pipeline"""
        try:
            df = self.extract()
            transformed_df = self.transform(df)
            self.load(transformed_df)
            self.logger.info("ETL pipeline completed successfully")
            return True
        except Exception as e:
            self.logger.error(f"ETL pipeline failed: {str(e)}")
            return False

To use this pipeline:

# Creating and running the pipeline
etl = CSVETLPipeline(
    input_file='raw_data.csv',
    output_file='processed_data.csv'
)
success = etl.run()

🚀 Performance Optimization Tips

Working with large CSV files can be challenging. Here are some tips to optimize your ETL pipeline:

1. Use Appropriate Data Types

# Specify dtypes when reading to reduce memory usage
dtypes = {
    'id': 'int32',
    'name': 'string',  # Pandas 1.0+ string dtype
    'amount': 'float32'
}

df = pd.read_csv('large_file.csv', dtype=dtypes)

2. Leverage Parallelization

For multi-core processing:

import dask.dataframe as dd

# Read CSV with Dask
dask_df = dd.read_csv('large_file.csv')

# Parallel processing
result = dask_df.map_partitions(transform_function).compute()

3. Use Generators for Memory Efficiency

def process_csv_rows(filename):
    with open(filename, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Process one row at a time
            yield transform_row(row)

🔍 Error Handling and Validation

Robust CSV ETL pipelines need proper error handling:

⚠️ Important: CSV files from external sources often contain unexpected data issues. Always implement comprehensive validation and error handling.

Handling Common CSV Issues

def safe_csv_import(file_path):
    try:
        # Try standard reading first
        return pd.read_csv(file_path)
    except UnicodeDecodeError:
        # Try different encodings
        for encoding in ['utf-8', 'latin1', 'ISO-8859-1']:
            try:
                return pd.read_csv(file_path, encoding=encoding)
            except UnicodeDecodeError:
                continue
    except pd.errors.ParserError:
        # Try with error recovery options
        return pd.read_csv(file_path, error_bad_lines=False, warn_bad_lines=True)

📈 Monitoring ETL Pipeline Progress

For large files, add progress tracking:

from tqdm import tqdm

def etl_with_progress(file_path, chunk_size=10000):
    # Get total number of rows
    with open(file_path, 'r') as f:
        total_rows = sum(1 for _ in f) - 1  # Subtract header
    
    # Process with progress bar
    chunks_processed = 0
    rows_processed = 0
    
    # Create progress bar
    with tqdm(total=total_rows, desc="ETL Progress") as pbar:
        for chunk in pd.read_csv(file_path, chunksize=chunk_size):
            # Process chunk
            processed_chunk = transform_data(chunk)
            load_data(processed_chunk)
            
            # Update progress
            rows_in_chunk = len(chunk)
            rows_processed += rows_in_chunk
            chunks_processed += 1
            pbar.update(rows_in_chunk)
    
    return {
        "rows_processed": rows_processed,
        "chunks_processed": chunks_processed
    }

🏁 Conclusion

Building effective ETL pipelines for CSV files involves careful consideration of extraction methods, transformation rules, and loading destinations. By using the patterns and techniques outlined in this guide, you can create reliable, efficient pipelines that handle everything from small data files to large-scale CSV processing tasks.

Remember these key points:

  • Choose the right extraction approach based on your file size and requirements
  • Apply transformations systematically with proper validation
  • Consider performance optimization for large files
  • Implement robust error handling
  • Monitor your pipeline execution

With these practices in place, you'll be well-equipped to handle CSV data processing for any data engineering project.

Tags