Data pipelines with Python and SQL - Part 1

Data pipelines with Python and SQL - Part 1

Play this article

Preface🌟

Data engineering’s key objective is turning raw data into valuable and usable information. A key component of this process is called a data pipeline.

In this article, we dive deep into what a data pipeline is and highlight Python and SQL’s roles in building them.

What is a data pipeline?🚰

A data pipeline is an object that facilitates the flow of data from one location to another through multiple stages. They imitate the behaviour of the pipelines that are built into homes for moving water from one place to different areas in the home.

Non-valuable data is:

  • raw = directly from source

  • undesirable = unwanted, useless, not fit-for-purpose

  • low-quality = unstructured, corrupted, dirty, incomplete, unusable

Remember the purpose of data engineering is to convert non-valuable data (raw, undesirable or unusable data) into a structured, desirable and useable state (i.e. valuable data) for end users to utilize them for their objectives.

In short, data by itself holds no value - only high-quality data contains value.

What qualifies as a data pipeline?✅

In short words, data pipelines must move data from a source to a destination. This can include moving data from a database into a data warehouse, from a 3rd party application to a visualization tool, across different databases etc. A source and a target is always required.

There are generally two methodologies for this:

  1. ETL (extract, transform, load) - this approach extracts data from the source systems, cleans and shapes the data into the desired structure and format, and then pushes it into the target location specified. Here’s the sequence it follows:

    1. Extract - pulls the raw data from source

    2. Transform - cleans the raw data into the desired format

    3. Load - stores clean data into the target destination

  2. ELT (extract, load, transform) - this approach is slightly different from the previous one - instead, the data is extracted from the source systems, saved into the destinations, and then cleaned and shaped into the desired format before being used by the end users. Here’s the sequence it follows:

    1. Extract - pulls the raw data from source

    2. Load - stores raw data into the target destination

    3. Transform - cleans the raw data into the desired format

What doesn’t qualify as a data pipeline?🛑

Not all objects that facilitate data movement qualify as a data pipeline. That’s because a pipeline moves a flowing object across different stages. If the predefined stages do not exist, there is no need for a pipeline to be built. Data pipelines exist because there are a series of complex activities that need to occur before the data is presented as expected to the target users.

Examples include

  • Data transfers - simple data movements without any real data transformations

  • Ad-hoc scripts - spontaneous, non-recurring data movements powered by scripts

  • One-time data imports - non-repetitive data transfers

  • Manual data uploads - files loading into a system manually

Benefits of a data pipeline✨

  • Quality assurance🎯 - generates high-quality data consistently

  • Efficiency🤖 - reduces manual intervention

  • Versatility⏱️ - enables real-time and batch data processing for decision-making

In today’s digital world, we interact with data pipelines in almost all aspects of our lives. For instance, something as simple as placing an order for kitchen utensils on Amazon is powered by a series of steps facilitated by data pipelines, like selecting items, receiving payments, entering validation stages, and updating the inventory databases, among others.

SQL for data pipelines📜

SQL is a programming language used to query and manipulate data in databases. For data pipelines, you can pull data from one table, perform transformations on it and load it into another table

SQL is useful for creating data pipelines that deal with moving data from tables and databases within the same platform e.g. if you’re migrating data from one database into another within PostgreSQL, using SQL can support these operations. i.e. in-house migration activities.

Demo🚀

Let’s create some test data to explore some examples that will make this easier to understand - here I’ll be using a PostgreSQL database to create the empty table required for the pipelines to work:

CREATE TABLE IF NOT EXISTS public.source_table (
    sale_id SERIAL PRIMARY KEY,
    gamer_name VARCHAR(50),
    game_title VARCHAR(100),
    purchase_date DATE,
    price DECIMAL(10,2)
);

CREATE TABLE IF NOT EXISTS public.target_table (
    sale_id SERIAL PRIMARY KEY,
    gamer_name VARCHAR(50),
    game_title VARCHAR(100),
    price DECIMAL(10,2)
);

We’ve created two empty tables:

  • source_table - where the sample data originates from

  • target_table - where the ideal data is expected to be loaded into

Now let’s populate the source_table table with some sample test data:

INSERT INTO public.source_table (gamer_name, game_title, purchase_date, price) VALUES 
    ('Aaron Stark', 'Final Fantasy XVI', '2023-01-15', 59.99),
    ('Sophia Turner', 'God of War: Ragnarok', '2023-01-18', 69.99),
    ('Michael White', 'The Last of Us Part III', '2021-12-25', 49.99),
    ('Linda Green', 'Ratchet & Clank: Rift Apart', '2023-01-19', 59.99);

So now we have a simple table containing PlayStation games with the following columns:

  • gamer_name is the name of each gamer

  • game_title is the title of each game

  • purchase_date is the date each game was purchased

  • price is the cost of each game

The aim is to extract games after January 1st 2023, and we’ll generate a report for the ones over $60.

Examples🧪

1. Satisfaction✅

Here’s what a basic SQL data pipeline would look like:

-- Pull data from source table
SELECT * FROM public.source_table WHERE purchase_date > '2023-01-01';

-- Load data into target table
INSERT INTO public.target_table (gamer_name, game_title, price)

-- Select and filter to the data that is necessary 
SELECT gamer_name, game_title, price
FROM public.source_table 
WHERE purchase_date > '2023-01-01' AND price > 60;

These are the order of steps the pipeline follows:

  • Data after January 1st 2023 is extracted from the source table (E)

  • Games priced over $60 are filtered out of the results (T)

  • Filtered data is loaded into the target table (L)

Disclaimer: This is NOT referring to the SQL engine’s order of execution, only the generic order the pipeline itself follows (more on this in a future article)

Let’s see what the target_table table now contains - we’ll run this query:

SELECT * FROM public.target_table

…and let’s see what it returns:

-- Output: 

1    "Sophia Turner"    "God of War: Ragnarok"    69.99

2. Violation❌

Now here’s an example of what a data pipeline is NOT in SQL:

SELECT gamer_name, game_title 
FROM public.source_table 
WHERE price > 60;

Although this query extracts the games over $60, there aren’t any serious transformations or loading activities to be considered a data pipeline - it’s simply a SQL query.

Python for data pipelines🐍

Python is a programming language that can perform many general-purpose activities. Its reputation comes from its simple syntax, versatile use cases and the vast array of modules and libraries created by members of the Python community for different purposes.

Modules like Pandas, Airflow and Numpy have been popular for managing data processing tasks, but Polars, Mage and Dask are beginning to take the industry by storm for several good reasons.

Demo🚀

For simplicity's sake, we’ll be using the same data as the SQL example, but demonstrating this in a Python light. The only difference will be the transformation applied to the data.

import pandas as pd

# Create the sample data
data = {
    'gamer_name': ['Aaron Stark', 'Sophia Turner', 'Michael White', 'Linda Green'],
    'game_title': ['Final Fantasy XVI', 'God of War: Ragnarok', 'The Last of Us Part III', 'Ratchet & Clank: Rift Apart'],
    'purchase_date': ['2023-01-15', '2023-01-18', '2021-12-25', '2023-01-19'],
    'price': [59.99, 69.99, 49.99, 59.99]
}

# Convert data dictionary to a DataFrame
df = pd.DataFrame(data)

# Save this data to a CSV file, representing our source data
df.to_csv('source_playstation_games.csv', index=False)

This time, we will apply a discount to each game based on its purchase date and save the transformed data into another file.

# Define a function to apply discount based on purchase date
def apply_discount(row):
    if row['purchase_date'] <= '2022-12-31':
        return row['price'] * 0.9  
    return row['price']

Examples🧪

1. Satisfaction✅

Here’s an example of a data pipeline using Python:

# 1. Extract data from source CSV
data_extracted = pd.read_csv('source_playstation_games.csv')

# 2. Transform data: Apply discount to game price
data_extracted['price'] = data_extracted.apply(apply_discount, axis=1)

# 3. Load transformed data to another CSV file
data_extracted.to_csv('target_playstation_games.csv', index=False)

Similar to the SQL satisfaction example, the code follows the E-T-L order of execution, where

  • The games are extracted from the CSV file source_playstation_games.csv (E)

  • The discount is applied to each of the games’ prices via the custom apply_discount function (T)

  • The transformed data is persisted into a new CSV file target_playstation_games.csv (L)

2. Violation❌

Now here’s an example of what a data pipeline is NOT in Python:

import pandas as pd

# Extract data from CSV
data_violation = pd.read_csv('source_playstation_games.csv')

# Convert data into Excel format 
data_violation.to_excel('destination_playstation_games.xlsx')

Changing the format of a file doesn’t qualify it as an ETL pipeline. The essence of a data pipeline is converting raw data into usable information. If the data included in the source data was already useable, then no transformation was required, and therefore would be considered a simple data transfer, or in this case, a file conversion job.

In most real-world scenarios, this job would be included in a data pipeline, but wouldn’t constitute one by itself.

Best practices with data pipelines🥇

High-quality data pipelines must be

  • reliable🛡️ - Ensure your data pipelines are robust enough to handle multiple scenarios

  • modular🧩 - It is easier to pinpoint the location of an error when a pipeline is constructed into stages (or modules) instead of dismantling the entire pipeline

  • idempotent🔁 - Develop the pipelines in such a way you can rerun them multiple times and generate the same results, this is one of the key characteristics of a high-quality data pipeline

  • version-controlled🔗 - As schemas and transformation logics evolve, it is important to keep a copy of each version of your pipeline in case something doesn’t go as planned and you need to roll back to previous versions, or you have audit requirements from the regulatory body of the organisation you work for

  • scalable📈- Design your pipelines in such a way they can handle both small and large volumes of data. It is good practice to expect data volumes to grow over time and develop pipelines around this so they do not require significant modifications

  • fault-tolerant⚙️ - data pipelines can be subject to unexpected failures at times which could cause unintended consequences like duplications or incomplete data fills, ensure there are mechanisms in place to ensure systems can restart from the point of failure without negatively impacting dependencies

Conclusion🏁

Hopefully, you now have a rough understanding of data pipelines with Python and SQL. In part 2, we’ll delve into a hands-on project that demonstrates how Python and SQL are combined to create a real-world data pipeline, so stay tuned!