Build a Simple ETL Pipeline: Extract, Transform, Load
Data rarely arrives clean and ready to use. Sales come from a CSV, customer info from an API, and inventory from a legacy system. Before anyone can analyze this data, someone has to pull it together, clean it up, and store it in a usable format. That process is called ETL — Extract, Transform, Load.
ETL pipelines are the backbone of data engineering. Every company that uses data has them running behind the scenes, often on a schedule. In this project, you'll build a complete ETL pipeline that extracts data from multiple simulated sources, validates and transforms it, and loads it into a SQLite database.
Step 1: Extract Data from Multiple Sources
The "E" in ETL stands for Extract. In the real world, this means pulling data from databases, APIs, files, and services. Since we can't access real APIs in the browser, we'll simulate multiple data sources using Python data structures — dictionaries for API-like data and multi-line strings for CSV-like data.
The key principle: extraction should be source-agnostic. Your pipeline should convert every source into a common format (a list of dictionaries) so downstream steps don't need to know where the data came from.
Create two extraction functions:
1. extract_from_api(data) — takes a list of dictionaries, prints "Extracted N records from API", returns the list.
2. extract_from_csv(csv_string) — takes a CSV string, parses it into a list of dicts (converting id to int and amount to float), prints "Extracted N records from CSV", returns the list.
Combine results from both sources into all_records. Print the total count as "Total records: N" and print each record's id and name.
Step 2: Validate the Extracted Data
Data from the real world is messy. Emails might be missing, amounts might be negative, names could be empty strings. Before transforming data, you should validate it and flag any problems. Some issues are warnings (missing email), while others are errors (negative amount).
Write a function validate_records(records) that checks each record for:
1. name must not be empty or missing — if invalid, add to errors list.
2. amount must be a positive number — if invalid, add to errors list.
3. email should not be empty — if empty, add to warnings list.
Return a tuple: (valid_records, errors, warnings) where valid_records excludes records with errors (warnings are OK).
Test with sample data that includes at least one invalid record (empty name or negative amount). Print "Valid: N", "Errors: N", "Warnings: N".
Step 3: Transform — Clean and Standardize
The "T" in ETL is often the most complex step. Transformation includes cleaning (fixing inconsistencies), standardizing (making formats uniform), and enriching (adding derived fields). Let's start with cleaning and standardizing.
Common cleaning tasks include trimming whitespace, standardizing capitalization, fixing data types, and handling missing values. The goal is to make every record follow the same format.
Write a function clean_records(records) that transforms each record:
1. Trim whitespace from name and email.
2. Convert name to title case (e.g., "alice johnson" becomes "Alice Johnson").
3. Convert email to lowercase.
4. Round amount to 2 decimal places.
5. If email is empty, set it to "unknown@placeholder.com".
Return the list of cleaned records. Test with messy data and print the cleaned results.
Step 4: Transform — Derive New Fields
Beyond cleaning, transformations often add new fields that make the data more useful for analysis. These derived fields are calculated from existing data. For example, you might categorize amounts into tiers, extract the domain from email addresses, or add processing timestamps.
Write a function enrich_records(records) that adds these derived fields to each record:
1. amount_tier — "low" if amount < 100, "medium" if 100–250, "high" if > 250.
2. email_domain — the part after @ in the email (e.g., "example.com").
3. name_parts — number of words in the name.
4. processed — set to True.
Return the enriched list. Test with sample data and print each record showing all new fields.
Step 5: Load into SQLite
The "L" in ETL means loading transformed data into its final destination. For us, that's a SQLite database. We'll create a table that matches our enriched record structure and insert all records in a batch.
Write a function load_to_database(records) that:
1. Creates an in-memory SQLite database.
2. Creates a customers table with columns: id (INTEGER PRIMARY KEY), name (TEXT), email (TEXT), amount (REAL), amount_tier (TEXT), email_domain (TEXT).
3. Inserts all records using executemany().
4. Commits and returns the connection.
Then query the database to verify: print the row count and all rows. Print "Loaded N records into database".
Step 6: Query and Verify the Loaded Data
After loading data, you should always verify it. Run queries to check row counts, look for unexpected NULLs, and generate summary statistics. This is the quality assurance step that catches loading errors before anyone uses the data.
Study the code below. It loads data into SQLite and runs verification queries. What will be printed? Pay close attention to the SQL aggregations and GROUP BY results.
Step 7: Build the Complete ETLPipeline Class
Now let's assemble everything into a single ETLPipeline class. A class-based pipeline is easier to configure, test, and rerun. It tracks statistics from each step so you can monitor the pipeline's health.
Build an ETLPipeline class with these methods:
1. __init__(self) — initializes stats dict: {extracted: 0, validated: 0, errors: 0, transformed: 0, loaded: 0}.
2. extract(self, api_data, csv_data) — combines both sources, updates stats, returns records.
3. validate(self, records) — filters out records with empty name or non-positive amount, updates stats, returns valid records.
4. transform(self, records) — cleans (strip, title case names, lowercase emails) and adds amount_tier, updates stats, returns transformed records.
5. load(self, records) — creates SQLite :memory: db, loads records, stores connection as self.conn, updates stats.
6. run(self, api_data, csv_data) — runs all steps in order, prints a summary report.
Test by running the pipeline with sample data. The summary should print "=== PIPELINE SUMMARY ===" and each stat.
Project Complete!
You've built a complete ETL pipeline from scratch. Here's what you accomplished: