Data pipelines with Python and SQL - Part 2

Data pipelines with Python and SQL - Part 2

Using Python & SQL to build a real-world ETL pipeline and dashboard

Preface ✨

In part 1, we explored the concept of data pipelines, how Python and SQL can be used to create them, and simple code examples to demonstrate them too.

This time in part 2, we will be venturing into a hands-on project that utilizes real-world data using Python and SQL together. You won’t want to miss this one!

This is what the dashboard of our ETL pipeline renders once we run it:

Now let’s show you the code that makes this possible step-by-step!

Approach 🪖

  • Set up environment

  • Set up configuration

  • Set up logger

  • Extract data from API

  • Transform data using pre-processing logic

  • Load data into Postgres database

  • Visualize data in a Streamlit app

1. Set up the environment🏖️

a. List the Python modules required

  • Create a requirements.txt file

  • Add all the relevant libraries required for the data pipeline:

requests
pandas
python-dotenv
psycopg2
streamlit
plotly-express

b. Create the virtual environment

I use Anaconda as my package manager and Windows as my main operating system, feel free to use your preferred tools to create your virtual environment.

In the terminal, type in the following:

conda create --name data_engineering_py_sql python=3.9
conda activate data_engineering_py_sql
pip install -r requirements.txt

You can list all the modules in the virtual environment by typing:

conda list -n data_engineering_py_sql

Then you can open up VS Code using:

code

c. Add the environment variables file

It is good practice to store sensitive credentials in an environment variable file to avoid unauthorised users accessing them. This will include API keys, access tokens, and passwords, among others.

Assuming you’re in the directory of your choice,

  • create a .env file

  • Enter all your sensitive credentials and environment variables into it:

API_KEY=your_api_key
API_HOST=your_api_host
LEAGUE_ID=your_league_id
SEASON=your_season
DB_NAME=your_database_name
DB_USER=your_database_user
DB_PASS=your_database_password
DB_HOST=your_database_host
DB_PORT=your_database_port

The os and python-dotenv modules help import these securely into our main script.

d. Add the .gitignore file

Create a .gitignore file to hide any files that contain data that is too sensitive to expose publicly or may not be important to view:

.env
__pycache__/
*.log

The files we want to be ignored when committing our repository to GitHub include:

  • .env - to avoid exposing sensitive environment variables

  • pycache folders - to hide caching Python byte-compiled operations from non-technical users

  • .log files - to hide files for storing logs

2. Set up configuration⚙️

First, we’ll import all the Python dependencies for this project:

import os
import logging 
import requests 
import psycopg2
import pandas as pd 
from dotenv import load_dotenv
from requests.exceptions import RequestException

Now we load the environment variables into the session using the os and python-dotenv modules:

# Load the environment variables 
load_dotenv()

API_KEY         =   os.getenv("API_KEY")
API_HOST        =   os.getenv("API_HOST")
LEAGUE_ID       =   os.getenv("LEAGUE_ID")
SEASON          =   os.getenv("SEASON")
DB_NAME         =   os.getenv("DB_NAME")
DB_USERNAME     =   os.getenv("DB_USERNAME")
DB_PASSWORD     =   os.getenv("DB_PASSWORD")
DB_HOST         =   os.getenv("DB_HOST")
DB_PORT         =   os.getenv("DB_PORT")

We’re using dotenv instead of python-dotenv to import the load_dotenv function because dotenv is a function under python-dotenv itself, where the load_dotenv function sits in.

You can find it in this GitHub repository here.

3. Set up logger📝

Python has an in-built module called logging that allows you to track different stages of code execution by streaming custom messages to the console or writing them directly into log files. This is useful when diagnosing unexpected behaviours or defects that may occur in the code.

A logger can have different severity levels that can be tagged for different situations. Here are the ones available:

  • DEBUG (10)

  • INFO (20)

  • WARNING (30)

  • ERROR (40)

  • CRITICAL (50)

3.1 Initialize the logger and set up the severity level

We start by setting up the logger and setting the minimum severity level to DEBUG:

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

This means it captures all log messages from DEBUG and upwards (i.e. INFO, WARNING etc.).

3.2 Set up the handlers

The handlers are the objects that help us direct the log messages to our target destinations, like files, consoles or other outputs.

Here’s how we set up the handlers:

File handler

This handler writes messages to a log file on the system.

# Create a file handler
file_handler = logging.FileHandler('football_table_standings.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))

Console handler

This handler streams the messages to the main console

# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))

3.3. Add handlers to the logger

To use these handlers, we need to add them to the logger object:

# Instantiate the logger object
logger = logging.getLogger()


# Add the file handler to the logger
logger.addHandler(file_handler)


# Add the console handler to the logger
logger.addHandler(console_handler)

3.4. Why loggers instead of print statements?

Using loggers is a better practice than using print statements because

  • you are provided with different severity levels, which is useful when working with different environments e.g. if you’re working in a production environment you may only want to log WARNING messages while in development you require DEBUG and INFO

  • you can write the outputs into different locations like files, consoles, third-party platforms etc. The print statement is only limited to console outputs. If you make your logging framework robust enough, you can even integrate it with monitoring and alerting tools - print statements do not come with that flexibility

  • the customized messages from logs can provide more context when they contain information like timestamps, file names, line numbers, which makes the troubleshooting process easier than using print statements

4. Extract data from API👾

An API is an interface that enables two entities to exchange information between each other. If one system (client) sends a request to another system (server) for certain information, the API is responsible for sending the client’s request and collecting the information once authorization is approved by the server.

In this scenario, our data pipeline is sending a request to the football API endpoint for the league table standings.

4.1 Define the API endpoints and headers

url           =   "<https://api-football-v1.p.rapidapi.com/v3/standings>"
headers       =   {"X-RapidAPI-Key": API_KEY, 
                                    "X-RapidAPI-Host": API_HOST}

🚫Note: Avoid hard-coding the API keys into your script, instead consider inserting them into environment variable files and call them into your script.

4.2 Define the query parameters:

This is for customizing the response provided by the API endpoint. Here we want the data to be filtered by the current season and league, represented by the SEASON and LEAGUE_ID variables respectively:


query_string  =   {'season': SEASON,
                                     'league': LEAGUE_ID}

4.3 Send the API request with exception handling

Python’s in-built request library is used to make API calls simply. But sometimes unpredictable issues can occur which require us to diagnose and fix, like the API temporarily going down, our API limit has been exceeded, a request timed out etc. We need mechanisms in place to handle these cases gracefully.

We can use try-except blocks for this:

try:
    api_response = requests.get(url, headers=headers, params=query_string, timeout=15)
    api_response.raise_for_status() 

except HTTPError as http_err:
    logger.error(f'HTTP error occurred: {http_err}')

except Timeout:
    logger.error('Request timed out after 15 seconds')

except RequestException as request_err:
    logger.error(f'Request error occurred: {request_err}')

Let’s break down what is happening:

We make the API request in the try block via the api_response object, then the api_response.raise_for_status() raises error messages if the API call is not successful e.g. HTTPError.

For each except block, different exceptions are expected to be raised if an error or issue is identified:

  • HTTPError - for capturing issues during the processing of the HTTP request.

  • Timeout - this exception is raised when the API fails to respond within the given time limit. We’ve set the time limit to 15 seconds, so any request beyond this should return a Timeout Exception.

  • RequestException - this catches many general exceptions linked to the request library. This will be raised if the exception isn’t necessarily a HTTPError or Timeout one.

Having these in place makes our pipeline robust enough to handle any potential disruption that could occur.

4.4 Parse the API response

After obtaining a successful response from the API, we can advance to parsing the JSON response it provides:

standings_data = api_response.json()['response']

5. Transform data using pre-processing logic🛠️

The next step is to shape the data into a structure that is approachable and useable.

5.1. Extract the data from the JSON response

We drill into the nested fields to find the standings data:

# Extracting the standings information
standings = standings_data[0]['league']['standings'][0]

5.2. Flatten the data

Then we need to shape this data into a format that’s easy to use.

To do this, we need to

  • iterate through each team’s standings,

  • extract the relevant fields,

  • append the fields to a list

To start, create an empty list:

data_list = []

Then we create a for loop to begin extracting and appending the data to the same list:

# Extract and append data into the list
for team_info in standings:
    rank            =   team_info['rank']
    team_name       =   team_info['team']['name']
    played          =   team_info['all']['played']
    win             =   team_info['all']['win']
    draw            =   team_info['all']['draw']
    lose            =   team_info['all']['lose']
    goals_for       =   team_info['all']['goals']['for']
    goals_against   =   team_info['all']['goals']['against']
    goals_diff      =   team_info['goalsDiff']
    points          =   team_info['points']

    data_list.append([rank, 
                                            team_name, 
                                            played, 
                                            win, 
                                            draw, 
                                            lose, 
                                            goals_for, 
                                            goals_against, 
                                            goals_diff, points]
        )

5.3. Convert the data into a dataframe

We will use the pandas library to turn the list into a dataframe:

# Create the dataFrame
columns         =   ['P', 'Team', 'GP', 'W', 'D', 'L', 'F', 'A', 'GD', 'Pts']
standings_df    =   pd.DataFrame(data_list, columns=columns)

…and then we’ll display the dataframe:

# Display the dataFrame
print(standings_df.to_string(index=False))

This should return the following results (as of 12th September 2023):

 P              Team  GP  W  D  L  F  A  GD  Pts
 1   Manchester City   4  4  0  0 11  2   9   12
 2         Tottenham   4  3  1  0 11  4   7   10
 3         Liverpool   4  3  1  0  9  3   6   10
 4          West Ham   4  3  1  0  9  4   5   10
 5           Arsenal   4  3  1  0  8  4   4   10
 6          Brighton   4  3  0  1 12  6   6    9
 7    Crystal Palace   4  2  1  1  5  4   1    7
 8         Brentford   4  1  3  0  8  5   3    6
 9 Nottingham Forest   4  2  0  2  6  6   0    6
10       Aston Villa   4  2  0  2  8  9  -1    6
11 Manchester United   4  2  0  2  5  7  -2    6
12           Chelsea   4  1  1  2  5  5   0    4
13            Fulham   4  1  1  2  4 10  -6    4
14         Newcastle   4  1  0  3  7  7   0    3
15            Wolves   4  1  0  3  4  8  -4    3
16       Bournemouth   4  0  2  2  4  8  -4    2
17     Sheffield Utd   4  0  1  3  4  7  -3    1
18           Everton   4  0  1  3  2  8  -6    1
19             Luton   3  0  0  3  2  9  -7    0
20           Burnley   3  0  0  3  3 11  -8    0

6. Load data into Postgres database📤

Now that we have our data transformed into a suitable format, we can upload this into the Postgres database.

6.1. Establish a connection with the database

We first need to set up the connection:

# Set up Postgres database connection
postgres_connection = psycopg2.connect(
    dbname=DB_NAME,
    user=DB_USERNAME,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT
)

# Get a cursor from the database
cur = postgres_connection.cursor()

6.2. Create a table in the database

The next step is to create a table that matches the structure of the data frame within the Postgres database.

We can leverage SQL for this operation and then execute and commit the operation:

# Use SQL to create a table for the Premier League
create_table_sql_query = """
    CREATE TABLE IF NOT EXISTS premier_league_standings_tbl (
            position            INT PRIMARY KEY,
            team                VARCHAR(255),
            games_played        INTEGER,
            wins                INTEGER,
            draws               INTEGER,
            losses              INTEGER,
            goals_for           INTEGER,
            goals_against       INTEGER,
            goal_difference     INTEGER,
            points              INTEGER
    );
"""


# Run the SQL query 
cur.execute(create_table_sql_query)


# Save the changes made in the Postgres database by committing them
postgres_connection.commit()

6.3. Insert the data into the table

Now that the table is created, we can loop through the dataframe and insert the data into the Postgres table

Let’s start by writing the INSERT query into a Python variable:

# Use SQL to insert data into the Premier League table 
insert_data_sql_query = """
    INSERT INTO public.premier_league_standings_tbl (
            position, team, games_played, wins, draws, losses, goals_for, goals_against, goal_difference, points              
    )
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)

    ON CONFLICT (position) DO UPDATE SET
    team               =   EXCLUDED.team, 
    games_played       =   EXCLUDED.games_played, 
    wins               =   EXCLUDED.wins, 
    draws              =   EXCLUDED.draws, 
    losses             =   EXCLUDED.losses, 
    goals_for          =   EXCLUDED.goals_for, 
    goals_against      =   EXCLUDED.goals_against, 
    goal_difference    =   EXCLUDED.goal_difference, 
    points             =   EXCLUDED.points
"""

Let me break down what this statement is doing quickly:

  • INSERT INTO...VALUES - for inserting the data into the premier_league_standings_tbl table and the columns specified

  • ON CONFLICT...DO UPDATE SET - for performing an upsert into the premier_league_standings_tbl table i.e. it updates rows with new data if they already have or inserts new rows if they do not exist

Then we write the for loop to iterate through each row of the standings_df dataframe:


for idx, row in standings_df.iterrows():
    cur.execute(insert_data_sql_query, 
                (row['P'], 
                 row['Team'], 
                 row['GP'], 
                 row['W'], 
                 row['D'], 
                 row['L'], 
                 row['F'], 
                 row['A'], 
                 row['GD'], 
                 row['Pts'])
    )

# Save the changes made in the Postgres database by committing them 
postgres_connection.commit()

6.4. Create a SQL view for ranked standings

After loading the data into the table, we need our football data to reflect the true order based on the performance metrics of each team after each game is played. We will do this by ordering the data by the points, goal difference, and then the goals scored by each team.

To avoid modifying the existing data, we can create a view to encapsulate the transformation logic:

# Use SQL to create a view that updates the table rankings
create_ranked_standings_view_sql_query = """
    CREATE OR REPLACE VIEW public.premier_league_standings_vw AS 
        SELECT 
            RANK() OVER (ORDER BY points DESC, goal_difference DESC, goals_for DESC) as position
            ,team
            ,games_played
            ,wins
            ,draws
            ,losses
            ,goals_for
            ,goals_against
            ,goal_difference
            ,points
        FROM 
            public.premier_league_standings_tbl;
"""

# Run the SQL query 
cur.execute(create_ranked_standings_view_sql_query)

# Save the changes made in the Postgres database by committing them
postgres_connection.commit()

The RANK() function is a window function that allows us to rank each team by specific columns, which has enabled us to create a new position successfully.

6.5. Close the database connection

# Close the database cursor and the Postgres connection
cur.close()
postgres_connection.close()

7. Visualize data in a Streamlit app📊

The final step is to visualize the Premier League table standings.

7.1. Set up the Streamlit app

We need to create a Python file named as app.py for the Streamlit app, and then we will import the important libraries to make it work:

import os
import psycopg2
import pandas as pd 
from PIL import Image
import streamlit as st
import plotly.express as px
from dotenv import load_dotenv

Then we initialize the environment variables:

# Load the environment variables 
load_dotenv()

API_KEY         =   os.getenv("API_KEY")
API_HOST        =   os.getenv("API_HOST")
LEAGUE_ID       =   os.getenv("LEAGUE_ID")
SEASON          =   os.getenv("SEASON")
DB_NAME         =   os.getenv("DB_NAME")
DB_USERNAME     =   os.getenv("DB_USERNAME")
DB_PASSWORD     =   os.getenv("DB_PASSWORD")
DB_HOST         =   os.getenv("DB_HOST")
DB_PORT         =   os.getenv("DB_PORT")

7.2. Fetch the data from the Postgres database

Using the to_sql function from pandas allows us to pull data from Postgres with ease:

# Fetch the Premier League data from Postgres
get_premier_league_standings_sql_query = """
    SELECT 
        position
        ,team
        ,games_played
        ,wins
        ,draws
        ,losses
        ,goals_for
        ,goals_against
        ,goal_difference
        ,points
    FROM 
        public.premier_league_standings_vw
    ORDER BY 
        position;
"""

# Read football data into dataframe
final_standings_df = pd.read_sql(get_premier_league_standings_sql_query, postgres_connection)

# Remove the index displayed
final_standings_df.set_index('position', inplace=True)

# Close database connection
postgres_connection.close()

7.3. Display the data in Streamlit

Now let’s work on the UI a little.

We’ll start by defining the page title and the icon for the app:

# Set the page configuration of the app
st.set_page_config(
    page_title   =  "Premier League Standings 2023/24",
    page_icon    =  "⚽",
    layout       =  "wide"
)

Let’s also add the Premier League logo to the top right-hand side of the web app:

# Read image into app
prem_league_logo_filepath  =  "/assets/premier_league_logo.png"
prem_league_logo_image     =  Image.open(prem_league_logo_filepath)

# Create columns for the layout and display the image through the 2nd one
col1, col2 = st.columns([4, 1])
col2.image(prem_league_logo_image)

…and then add a main title:

st.title("⚽🏆 Premier League Table Standings 2023/24 ⚽🏆")

Let’s also add a sidebar to display some brief useful information:

# Display instructions
st.sidebar.title('Instructions 📖')
st.sidebar.write("""
The table showcases the current Premier League standings for the 2023/24 season. Toggle the visualization options to gain deeper insights!
""")

The users can decide if they want to see a visual representation of the football data from the table :

show_visualization = st.sidebar.radio('Would you like to view the standings as a visualization too?', ('No', 'Yes'))
fig_specification  = px.bar(final_standings_df, 
                        x           =   'team', 
                        y           =   'points', 
                        title       =   'Premier League Standings 2023/24', 
                        labels      =   {'points':'Points', 'team':'Team', 'goals_for': 'Goals Scored', 'goals_against': 'Goals Conceded', 'goal_difference':'Goal Difference'},
                        color       =   'team',
                        height      =   600,
                        hover_data  =   ['goals_for', 'goals_against', 'goal_difference']
)

if show_visualization == 'Yes':
    st.table(final_standings_df)
    st.write("")
    fig = fig_specification 
    st.plotly_chart(fig, use_container_width=True)
else:
    st.table(final_standings_df)

7.4. Run the app

Enter this into the terminal:

streamlit run app.py

Now the dashboard should be rendered in a Streamlit app like the video showed earlier!🥳

You can find the full codebase in the GitHub repo here.

Conclusion🏁

Starting from sending the API calls to the endpoint, and then using SQL logic to extract, transform and load data across the application, and wrapping the data with a Streamlit app, we’ve managed to demonstrate we can combine Python and SQL to create an ETL pipeline to feed data into a tool that others can use in the real world.

The most important thing is understanding this approach is utilized in many applications in the real world.

Stay tuned, there just may be a part 3 coming soon, where we deal with migrating data from the database into the cloud this time!👀

Feel free to reach out via my handles and I’ll be happy to point you in the right direction where I can: LinkedIn| Email | Twitter | Tiktok