Skip to main content

Build a Simple ETL Pipeline: Extract, Transform, Load

Advanced40 min7 exercises140 XP
0/7 exercises

Data rarely arrives clean and ready to use. Sales come from a CSV, customer info from an API, and inventory from a legacy system. Before anyone can analyze this data, someone has to pull it together, clean it up, and store it in a usable format. That process is called ETL — Extract, Transform, Load.

ETL pipelines are the backbone of data engineering. Every company that uses data has them running behind the scenes, often on a schedule. In this project, you'll build a complete ETL pipeline that extracts data from multiple simulated sources, validates and transforms it, and loads it into a SQLite database.

Step 1: Extract Data from Multiple Sources

The "E" in ETL stands for Extract. In the real world, this means pulling data from databases, APIs, files, and services. Since we can't access real APIs in the browser, we'll simulate multiple data sources using Python data structures — dictionaries for API-like data and multi-line strings for CSV-like data.

The key principle: extraction should be source-agnostic. Your pipeline should convert every source into a common format (a list of dictionaries) so downstream steps don't need to know where the data came from.

Extracting from multiple sources
Loading editor...
Exercise 1: Build the Extraction Layer
Write Code

Create two extraction functions:

1. extract_from_api(data) — takes a list of dictionaries, prints "Extracted N records from API", returns the list.

2. extract_from_csv(csv_string) — takes a CSV string, parses it into a list of dicts (converting id to int and amount to float), prints "Extracted N records from CSV", returns the list.

Combine results from both sources into all_records. Print the total count as "Total records: N" and print each record's id and name.

Loading editor...

Step 2: Validate the Extracted Data

Data from the real world is messy. Emails might be missing, amounts might be negative, names could be empty strings. Before transforming data, you should validate it and flag any problems. Some issues are warnings (missing email), while others are errors (negative amount).

Exercise 2: Data Validation
Write Code

Write a function validate_records(records) that checks each record for:

1. name must not be empty or missing — if invalid, add to errors list.

2. amount must be a positive number — if invalid, add to errors list.

3. email should not be empty — if empty, add to warnings list.

Return a tuple: (valid_records, errors, warnings) where valid_records excludes records with errors (warnings are OK).

Test with sample data that includes at least one invalid record (empty name or negative amount). Print "Valid: N", "Errors: N", "Warnings: N".

Loading editor...

Step 3: Transform — Clean and Standardize

The "T" in ETL is often the most complex step. Transformation includes cleaning (fixing inconsistencies), standardizing (making formats uniform), and enriching (adding derived fields). Let's start with cleaning and standardizing.

Common cleaning tasks include trimming whitespace, standardizing capitalization, fixing data types, and handling missing values. The goal is to make every record follow the same format.

Exercise 3: Clean and Standardize Data
Write Code

Write a function clean_records(records) that transforms each record:

1. Trim whitespace from name and email.

2. Convert name to title case (e.g., "alice johnson" becomes "Alice Johnson").

3. Convert email to lowercase.

4. Round amount to 2 decimal places.

5. If email is empty, set it to "unknown@placeholder.com".

Return the list of cleaned records. Test with messy data and print the cleaned results.

Loading editor...

Step 4: Transform — Derive New Fields

Beyond cleaning, transformations often add new fields that make the data more useful for analysis. These derived fields are calculated from existing data. For example, you might categorize amounts into tiers, extract the domain from email addresses, or add processing timestamps.

Exercise 4: Enrich Records with Derived Fields
Write Code

Write a function enrich_records(records) that adds these derived fields to each record:

1. amount_tier — "low" if amount < 100, "medium" if 100–250, "high" if > 250.

2. email_domain — the part after @ in the email (e.g., "example.com").

3. name_parts — number of words in the name.

4. processed — set to True.

Return the enriched list. Test with sample data and print each record showing all new fields.

Loading editor...

Step 5: Load into SQLite

The "L" in ETL means loading transformed data into its final destination. For us, that's a SQLite database. We'll create a table that matches our enriched record structure and insert all records in a batch.

Exercise 5: Load Data into SQLite
Write Code

Write a function load_to_database(records) that:

1. Creates an in-memory SQLite database.

2. Creates a customers table with columns: id (INTEGER PRIMARY KEY), name (TEXT), email (TEXT), amount (REAL), amount_tier (TEXT), email_domain (TEXT).

3. Inserts all records using executemany().

4. Commits and returns the connection.

Then query the database to verify: print the row count and all rows. Print "Loaded N records into database".

Loading editor...

Step 6: Query and Verify the Loaded Data

After loading data, you should always verify it. Run queries to check row counts, look for unexpected NULLs, and generate summary statistics. This is the quality assurance step that catches loading errors before anyone uses the data.

Exercise 6: Verify and Analyze Loaded Data
Predict Output

Study the code below. It loads data into SQLite and runs verification queries. What will be printed? Pay close attention to the SQL aggregations and GROUP BY results.

Loading editor...

Step 7: Build the Complete ETLPipeline Class

Now let's assemble everything into a single ETLPipeline class. A class-based pipeline is easier to configure, test, and rerun. It tracks statistics from each step so you can monitor the pipeline's health.

Exercise 7: The Complete ETLPipeline Class
Write Code

Build an ETLPipeline class with these methods:

1. __init__(self) — initializes stats dict: {extracted: 0, validated: 0, errors: 0, transformed: 0, loaded: 0}.

2. extract(self, api_data, csv_data) — combines both sources, updates stats, returns records.

3. validate(self, records) — filters out records with empty name or non-positive amount, updates stats, returns valid records.

4. transform(self, records) — cleans (strip, title case names, lowercase emails) and adds amount_tier, updates stats, returns transformed records.

5. load(self, records) — creates SQLite :memory: db, loads records, stores connection as self.conn, updates stats.

6. run(self, api_data, csv_data) — runs all steps in order, prints a summary report.

Test by running the pipeline with sample data. The summary should print "=== PIPELINE SUMMARY ===" and each stat.

Loading editor...

Project Complete!

You've built a complete ETL pipeline from scratch. Here's what you accomplished:

  • Extracted data from multiple sources (API-like dicts and CSV strings)
  • Validated data quality with error and warning separation
  • Cleaned and standardized formats (whitespace, casing, placeholders)
  • Enriched records with derived fields (tiers, domains)
  • Loaded transformed data into a SQLite database
  • Verified the loaded data with SQL queries
  • Assembled everything into a reusable ETLPipeline class