Unit testing in data engineering with Python 🔧🐍

What is unit testing?🤔🧪

Unit testing is an automated test that verifies an individual unit of code is operating as it’s expected to. This can mean testing any of the following components isolated from the rest of the source code:

  • function

  • module

  • block (or body) of code

  • interface

How can it fit data engineering use cases?🔧📊

Unit testing is often linked to software development, but they can also be applied to data engineering tasks, especially when building data pipelines, platforms and infrastructures. The complex operations involved in building data solutions can benefit from unit testing practices.

Its application would involve verifying each individual unit in the data solutions contains the right business logic and behaves as expected.

Unit testing can be used to run validation checks on

  • Data quality ✅- confirming the data meets the expected level of integrity

  • Business logic 📈- verifying the data represents real events regarding the business

  • Schema compliance 📐- validating the structure of tables, databases and data meet the expected data types, naming conventions and other custom logic

  • Data transformations🔄 - checking the data is manipulated in line with the developer’s intentions and the user’s expectations

Benefits of unit tests in data engineering

  • Increase in test efficiency⏱- Reduces the amount of time and effort spent on manual tests

  • Early bug detection🐞- Easier to catch bugs and issues earlier in the code base

  • Increase testing accuracy🎯- Higher degree of accuracy than manual testing approaches

  • Higher degree in data quality📈 - Improves the quality of data pipelines

  • Increased confidence💪- Data pipelines that are tested well boost confidence in the code’s outputs and integrity

  • Simplifying debugging🔍- Unit tests point to the issues with the data product’s code instead of leaving the developer to scan and guess root causes manually

Drawbacks of unit testing in data engineering

  • Resource-heavy to develop⏳- Requires plenty of resources (in time and effort) to write and maintain than manual testing

  • Difficult on large data📊- unit testing can be challenging with data that has significant volume and velocity, so may not be appropriate to depend on it alone in these instances

  • Limitations in coverage💻- this paradigm may not consider covering all external data dependencies and integrations, which may leave potential issues unidentified

Stages of unit tests🔁

  • Setup - Prepare the test environment

  • Creation - Write the tests

  • Execution - Run the tests then check the results

  • Tear down - clean up the test environment by disposing of the state changes made by the tests and clearing temporary resources

I will be working with this test dataset, test_data.csv, to demonstrate what is involved in each stage of a standard unit test:

  1. A test dataset named test_data.csv:
route,passenger_count,time_stamp,driver_id,vehicle_id,is_weekend,driver_name,vehicle_model
A,50,2023-06-15 10:00:00,1,100,True,John,XC90
B,20,2023-06-15 11:00:00,2,101,False,Susan,Corolla
C,5,2023-06-15 12:00:00,3,102,True,Robert,Civic
D,100,2023-06-15 13:00:00,4,103,False,Linda,Accord
A,25,2023-06-15 14:00:00,5,104,True,Jack,Model S
B,15,2023-06-15 15:00:00,6,105,False,Patricia,CX-5
C,3,2023-06-15 16:00:00,7,106,True,James,RAV4
D,80,2023-06-15 17:00:00,8,107,False,Jennifer,Outback
  1. A Python script with the relevant data processing functions named dummy_data_pipeline.py:
import pandas as pd

# Load data for transportation into dataframe
def load_data(file_path):
    df = pd.read_csv(file_path)
    return df

# Filter the transportation data by passenger count
def filter_data(df):
    filtered_df = df[df["passenger_count"] >= 30]
    return filtered_df

# Clean the transportation data
def clean_data(df):
    cleaned_df = df.dropna()
    return cleaned_df

# Aggregate the transportation data
def aggregate_data(df):
    aggregated_df = df.groupby("route").sum()
    return aggregated_df
  1. A unit test script called test_dummy_data_pipeline.py, which we will be building as we advance each step

1. Setup🔧

Begin by preparing the test environment by loading dependencies and initializing the objects in one go.

This saves the need to set up the resources from scratch in each test:

import unittest
import pandas as pd
from unittest import TestCase
from dummy_pipeline import load_data, filter_data, clean_data, aggregate_data

class TestDataPipeline(TestCase):

    # Setup test environment 
    def setUp(self):
        self.file_path  =   'test_data.csv'
        self.test_df    =   load_data(self.file_path)

2. Creation🎨✍

Write the tests required to validate and verify the individual components of the source code:

        # Validate the row count
        def test_row_count(self):
        expected_row_count  =   10
        actual_row_count    =   len(self.test_df)
        self.assertEqual(actual_row_count, expected_row_count, f"The actual row count {actual_row_count} does not match with the expected row count {expected_row_count}")

        # Validate the filter function works as expected
    def test_filter_data(self):
        test_df                     =   pd.DataFrame({"passenger_count": [20, 30, 40]})
        filtered_df                 =   filter_data(test_df)
        selected_filtering_value    =   30
        self.assertTrue((filtered_df["passenger_count"] >= selected_filtering_value ).all()), f"Filter operation doesn't work as expected - this one allows passengers less than {selected_filtering_value}")

        # Validate the cleaning function works as expected 
    def test_clean_data(self):
        test_df = pd.DataFrame({"route": ['Angola', 'Botswana', 'Cameroon', None]})
        clean_df = clean_data(test_df)
        self.assertFalse(clean_df.isnull().values.any(), "Cleaning operation didn't work as expected - this contains null values")

3. Execution 🏃🧐

Run the tests and compare the actual results to the expected outcomes (expected vs actuals). Troubleshoot if the program highlights any variances or errors from the tests:

if __name__=="__main__":
    unittest.main()

We can run the test suite automatically when adding the above code to the bottom of our tests. Once this is added, you can run the test from the terminal after navigating to the directory where the test is located, and then running the following command:

python -m unittest

4. Tear down🧹

Reset states and close all resources used for the tests once done :

def tearDown(self):
        self.test_df = None

Setting self.test_df to None ensures that the test_df object is not interfering with other tests that could be running.

What does a good and bad unit test look like?🌓

Good tests 😇✅

Here are some of the components of a good unit test:

  • Reliable✔- the tests can run multiple times and return the same results

  • Isolated🏝️ - the tested components do not depend on other components in the source code

  • Thorough🕵️‍♀️ - the tests cover every possible scenario and edge case linked to the code

  • Concise names📝 - the names given to each test are short and easy to understand

  • Automated🤖 - each test runs automatically to facilitate testing that is conducted frequently and with minimal human effort

1. Reliable✔

If we run the test over and over again we should expect the same outputs:

def test_row_count(self):
        expected_row_count  =   10
        actual_row_count    =   len(self.test_df)
        self.assertEqual(actual_row_count, expected_row_count, f"The actual row count {actual_row_count} does not match with the expected row count {expected_row_count}")

2. Isolated🏝️

The individual units being tested are not coupled with other functions, modules or parts of the code:

def test_filter_data(self):
        test_df                     =   pd.DataFrame({"passenger_count": [20, 30, 40]})
        filtered_df                 =   filter_data(test_df)
        selected_filtering_value    =   30
        self.assertTrue((filtered_df["passenger_count"] >= selected_filtering_value ).all()), f"Filter operation doesn't work as expected - this one allows passengers less than or equal to {selected_filtering_value}"

This example is testing the filter_data operation only. The test assumes the function should only return rows with a total passenger count of 30 or more.

3. Thorough🕵️‍♀️

Every situation the code will encounter is taken into account in each test:

def test_clean_data(self):
        test_df = pd.DataFrame({"route": ['Angola', 'Botswana', 'Cameroon', None]})
        clean_df = clean_data(test_df)
        self.assertFalse(clean_df.isnull().values.any(), "Cleaning operation didn't work as expected - this contains null values")

The test for this example is checking if the clean_data function has managed to remove all the NaN values from the test data and if there were any available. Other microscopic tests similar to this can be created if they’re plausible.

4. Concise names📝

The names picked for each test follow a consistent naming convention that is easy to read and understand at a glance (usually prefixed with test_xxx, like test_positive_integers)

If you notice the last three examples followed a similar naming convention, where each test contained the test_xxx prefix, to make it easy to identify the tests from other objects within the script (test_row_count, test_filter_data, test_clean_data respectively).

5. Automated🤖

Little to no human intervention is needed to run each test. Production level tests should be integrated into a CI/CD pipeline, where changes made to a data pipeline are detected automatically by the program, which triggers a series of test cases to confirm the changes can be integrated with the rest of the codebase. Once these changes have passed, the pipeline can deliver the changes to the staging branch, layer or zone, and/or deployed them to the production branch.

Bad tests 👿🛑

Some components of a bad unit test include the following:

  • Not replicable🚫 - the tests cannot be replicated in different environments

  • Limited coverage🕳️ - the tests fail to consider every use-case and edge scenario linked to the code

  • Overly complex🤯- the tests contain “over-the-top” complexities like long setup processes or tests that are too granular

  • Too sensitive🧪- the tests are too brittle to changes which could lead to false positives even though the tested component isn’t broken, and/or false negatives when the component is broken, therefore adding to the unreliability of the test results

1. Not replicable🚫

A test that cannot be replicated in different environments under the same conditions is not a good test:

def test_data_integrity(self):
        original_df = self.test_df.copy()
        self.test_df.iloc[5, 0] = "testing_data_modification"
        self.assertEqual(original_df.equals(self.test_df), True, "This test has modified the original data")

If you want to quickly find out if a unit test is a good or bad one, check if it modifies the original data. The one in this example certainly does - this results in inconsistent results each time the test is run.

This is what we can do to rectify this:

def test_data_integrity(self):
        new_df = load_data(self.file_path)
        self.assertTrue(self.test_df.equals(new_df), "This operation fails to preserve the data integrity during the load")

Now we are able to continue testing the data integrity of the load_data's output and eliminate the need to modify the original data.

2. Limited coverage🕳️

A test that doesn’t target enough key areas to validate is providing limited coverage. Here’s an example of it:

def test_data_object_type(self):
        self.assertIsInstance(self.test_df, pd.DataFrame, "The object loaded is not a dataframe")

Other areas could be validated in this test without overloading it with use cases. An example would be like validating the contents within the loaded data frame.

Here’s how we can make this test more robust:

def test_data_structure_and_types(self):
        expected_columns        =   ["route", "passenger_count", "time_stamp", "driver_id", "vehicle_id", "is_weekend", "driver_name", "vehicle_model"]
        expected_dtypes         =   ["object", "int64", "datetime64[ns]", "int64", "int64", "bool", "object", "object"]
        self.assertListEqual(list(self.test_df.columns), expected_columns, "The actual data structure of the loaded data does not match the expected structure ")
        self.assertListEqual(list(self.test_df.dtypes.astype(str)), expected_dtypes, "The actual data types do not match the expected data types.")

Now this test covers a validation check on both the data structure and data types of the test data, which is more useful than its limited predecessor.

3. Overly complex 🤯

Sometimes a test can generate the expected output, but there is too much going on in the function:

def test_null_values_in_cells(self):
        expected_null_count     =   0 
        actual_null_count       =   0
        for _placeholder, row in self.test_df.iterrows():
            for value in row:
                if value is None:
                    actual_null_count += 1
        self.assertEqual(actual_null_count, expected_null_count, f"Data contains {actual_null_count} NULL values instead of {expected_null_count} NULL values")

If a function is overly engineered, it could be more difficult to read and manage over time.

Let’s try simplifying it:

def test_null_values_in_data(self):
        expected_null_count     =   0 
        actual_null_count       =   self.test_df.isnull().sum().sum()
        self.assertEqual(actual_null_count, expected_null_count, f"Data contains {actual_null_count} NULL values instead of {expected_null_count} NULL values")

This example has replaced the complex for loop operation with a simple one-lined example that also counts the number of null values in the test data.

4. Too sensitive🧪

A test is considered a bad one if it is tightly coupled to specific data points in the test data. This means if there are changes to the tested data points no matter how small, the test results will always fail even though the components of the test are not broken. This means the test is too sensitive - here’s an example of one:

def test_existence_of_driver_name(self):
        expected_driver_name    = 'John Smith'
        actual_driver_name      = self.test_df['driver_name'].iloc[0]
        self.assertEqual(actual_driver_name, expected_driver_name, f"Expected driver name '{expected_driver_name}' not found in database ... ")

This example is searching for a specific driver’s name in the first row of the test data assuming its existence indicates the data loading process was successful or not. If the order of the data changes, or the driver undergoes a name change, this test will fail even though the test’s functionality is still the same or the load data operation isn’t broken.

Now let’s explore an approach that rectifies this:

def test_existence_of_driver_name(self):
        column = 'driver_name'
        self.assertIn(column, self.test_df.columns, f"The {column} column is not present in the test dataset...")
        self.assertTrue(self.test_df[column].notna().any(),  "No valid driver names are found in the dataset...")

Instead of testing for a specific driver’s name, we can test that valid names are present in the driver_name column instead. This makes the test less brittle and still validates the load operation was a success, all because there are no invalid driver names present in the driver_name column.

Test Frameworks available for Python🔬

  • unittest - in-built library in Python for unit testing

  • pytest - a popular framework for testing a software’s functionality and outputs

  • great_expectations - a tool for profiling, documenting and checking if data meets the expectations defined by the engineers

When should I use unit tests🤔👍?

You should consider unit testing when:

  • adding new features or functionalities to existing data pipeline source code

  • performing complex data operations on business logic, like ingestions, transformations, orchestrations

  • you want to ensure new changes made to data solutions do not break or disrupt existing processes i.e. backward compatibility checks

  • the code uses a significant amount of resources and time to execute, making debugging an expensive endeavour

  • frequent modifications to the data products are made, to reduce the chance of sneaking in unexpected behaviours and bugs

When should I NOT use unit tests🤔👎?

Unit testing generally should not be used when

  • it is difficult to reset or control the state of data across each processing stage

  • performing simple operations, like a standard SELECT statement on a sales table

  • the code is heavily interacting with external systems, APIs or databases, other testing approaches like integration tests may be more appropriate

Ultimately, if the cost of implementing unit testing outweighs the benefits, it is best to consider alternative approaches where the opposite is true. But this should be determined through careful consideration with other engineers, where there are opportunities for other qualified colleagues to challenge the decision-making thought process behind this.

Disclaimer: Each of the points raised in this blog is generalizations on unit-testing, and therefore relative by use-case. Use your professional discretion to prioritize or ignore the points that meet your needs.

Feel free to reach out via my handles: LinkedIn| Email | Twitter