SOLID principles for Spark applications

Where PySpark meets SOLID principles💥

Preface🚀

This blog post explores whether PySpark can incorporate SOLID principles for data engineering tasks.

Here’s a series on SOLID principles with Python for data engineering tasks I’ve been working on:

I’ve also explored functional programming’s application in data engineering in these posts too:

What is Spark? 💥💡

Spark is a framework for processing large volumes of data distributed across multiple machines at the same time. It is originally written in Scala, a programming language that supports object-oriented programming (OOP) and functional programming (FP).

Spark mainly deals with immutable data types like RDDs and DataFrames. If a data type is immutable, this means that once it is created, it cannot be modified. The only way to change them is to create an entirely new structure that includes the desired changes.

What is PySpark?💥🐍

PySpark is the Python API for interacting with the services provided by Spark. This allows developers to leverage the power of Spark while using the simplicity of Python to facilitate the creation of robust applications that process large amounts of data

So Python programmers can stick with Python and take advantage of Spark’s distributed computing capabilities through the use of PySpark.

Object-oriented programming (OOP)👥

Object-oriented programming (OOP) is a style of programming that emphasises the use of objects. In OOP, data is stored in the instances of classes (not the classes themselves). So the class would have to first be read (or instantiated) into an object before the data can be stored in it. The data stored in a class’s instance is called “state”.

OOP often deals with mutable data, that is, data can be modified once it is created.

🔋Benefits of OOP with PySpark

  • Reusability 🔁 - Classes can be reused to create multiple new objects without creating code from scratch

  • Flexibility ⚙️ - Classes can be adjusted according to the changing needs required of the program

  • Modularity 🧱 - Code is organised into independent classes and methods, so modifying one section of code won’t impact other sections

⚠️Challenges of OOP with PySpark

  • Mutability 🔄 - Processing mutable data types in a distributed computing environment would cause state-related issues like trying to maintain consistency in processing the mutable data on multiple nodes

  • Hard to use inheritance or polymorphism 🚫 - PySpark currently lack sufficient support for these OOP concepts to build robust pipelines via OOP

  • Breaking encapsulation 🔓- RDDs and DataFrames are prone to compromising encapsulation in OOP because they expose data during processing in the worker nodes

When should I use OOP in PySpark?🤔💭

TLDR: When maintaining state across data pipelines is required. 🗃️🔄

While storing state for distributed computing is often discouraged (especially with mutable data), there are use cases where this can be justified. Let’s demonstrate with code examples:

We can start by setting up the configuration logic to feed our Spark application once we’re ready to set it up:

from pyspark import SparkConf
from pyspark.sql import SparkSession

spark_config = (SparkConf()
          .setAppName("Stephen's OOP App for Blog")
          .setMaster("local")                           # Cluster: Standalone
          .set("spark.executors.instances", 3)          # Executors: 3
          .set("spark.executors.cores", 5)              # CPU cores: 5
          .set("spark.memory.fraction", "0.5")          # Heap space fraction - 50% storage memory, 50% execution memory
          .set("spark.executor.memory", "2g")           # Execution memory: 2 GB    
         )

So let’s go through the reasons why OOP in PySpark would make sense:

  1. For managing system resources appropriately 💻🔄- handling system resources like database connections, sensitive files and environment variables may require a standardised and secure protocol for releasing once users are done with them.

A good example of this would be using methods to start and stop a Spark Session, like so:

# Create a class to initialize and terminate Spark Session
class SparkSessionManager:
    def __init__(self):
        self.spark = None


        # Create a method for starting the Spark Session
    def start_spark_session(self, config):
        self.spark = SparkSession.builder.config(conf=config).getOrCreate()
        print("Started Spark Session. ")


        # Create a method for stopping the Spark Session
    def stop_spark_session(self):
        if self.spark is not None:
            self.spark.stop()
            print("Stopped Spark Session.")

The SparkSessionManager class contains two methods:

  1. start_spark_session - for starting the Spark Session

  2. stop_spark_session - for stopping the Spark Session

from pyspark.sql import Row

# Instantiate object for Spark Session
spark_session_manager = SparkSessionManager()

# Activate Spark Session
spark_session_manager.start_spark_session(spark_config)

# Create dummy data 
data_1 = [
        Row(name='John', age=25, city='London'),
        Row(name='Matilda', age=44, city='Shanghai'),
        Row(name='Ashley', age=21, city='Lagos'),
        Row(name='Mike', age=19, city='Doncaster'),
        Row(name='Charlie', age=53, city='Abuja'),
        Row(name='Brian', age=27, city='New York'),
       ]

# Create a Spark dataframe
df_1 = spark_session_manager.spark.createDataFrame(data_1)

# Display data
df_1.show()

### Output:
+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|   John| 25|   London|
|Matilda| 44| Shanghai|
| Ashley| 21|    Lagos|
|   Mike| 19|Doncaster|
|Charlie| 53|    Abuja|
|  Brian| 27| New York|
+-------+---+---------+

We can then read the data into a Spark dataframe and perform any operation we require from here.

Then once we’re done, we can terminate the Spark Session:

# Terminate Spark session
spark_session_manager.stop_spark_session()
  1. For building tools that share behaviour across transformations 👥🛠️- This style of programming is particularly useful for building applications that share state across data transformations. Examples like this are machine learning tools that require data stored in a class to perform predictions in real time.

Let’s add more data to demonstrate this use case:

# Create more dummy data 
data_2 = [
        Row(name='Sam', age=65, city='Sydney'),
        Row(name='Louisa', age=24, city='Gatwick'),
        Row(name='Zeeshan', age=31, city='Amsterdam'),
        Row(name='Todd', age=58, city='Paris'),
        Row(name='Samuel', age=26, city='Cape Town'),
        Row(name='Michaella', age=44, city='Toronto'),
       ]

# Create another Spark dataframe
df_2 = spark_session_manager.spark.createDataFrame(data_2)

# Display data
df_2.show()

### Output:
+---------+---+---------+
|     name|age|     city|
+---------+---+---------+
|      Sam| 65|   Sydney|
|   Louisa| 24|  Gatwick|
|  Zeeshan| 31|Amsterdam|
|     Todd| 58|    Paris|
|   Samuel| 26|Cape Town|
|Michaella| 44|  Toronto|
+---------+---+---------+

We can use the transform_dataframe method from the PreprocessingManager class to apply the same sequence of transformations to each dataframe read into Spark:

from pyspark.sql import DataFrame

# Create a class for pre-processing data 
class PreprocessingManager:
    def __init__(self, df: DataFrame):
        self.df = df

    # Create a method for applying custom transformation tasks
    def transform_dataframe(self):
        self.df = self.df.dropna()
        self.df = self.df.filter(self.df["age"] > 30)

        return self.df


# Instantiate the object for preprocessing data
pre_processor_1    =   PreprocessingManager(df_1)
pre_processor_2    =   PreprocessingManager(df_2)

So let’s apply the transformations - we’ll start with the first dataframe:

# Transform the dataframes 

# 1. 
transformed_df_1   =   pre_processor_1.transform_dataframe()
transformed_df_1.show()

#### Output 1:
+-------+---+--------+
|   name|age|    city|
+-------+---+--------+
|Matilda| 44|Shanghai|
|Charlie| 53|   Abuja|
+-------+---+--------+

…and now the second one …:

# 2.
transformed_df_2   =   pre_processor_2.transform_dataframe()
transformed_df_2.show()

#### Output 2:
+---------+---+---------+
|     name|age|     city|
+---------+---+---------+
|      Sam| 65|   Sydney|
|  Zeeshan| 31|Amsterdam|
|     Todd| 58|    Paris|
|Michaella| 44|  Toronto|
+---------+---+---------+
  1. For encapsulating complex operations 📦🔐- although this sounds like a contradiction to the last point for the “⚠️Challenges of OOP with PySpark”, encapsulation in PySpark is still possible for organising code into methods for

    1. hiding the internal state of specific operations, and

    2. exposing necessary functionalities e.g. for data validation

# Create a class for validating data 
class IDataValidator:


    # Create a method for checking if the field selected is positive
    @staticmethod
    def check_if_data_is_positive(df, column):
        return df.filter(df[column] > 0)


    # Create a method for checking the people over 40 years of age
    @staticmethod
    def check_people_over_40(df, age_field):
        return df.filter(df[age_field] > 40)


    # Create a method for checking if the field contains any NULL values
    @staticmethod
    def check_if_column_contains_null(df, column):
        return df.filter(df[column].isNotNull())

Now let’s run the checks, starting with the first:

# Run 1st check 
validated_data_1 = IDataValidator.check_if_data_is_positive(df_1, "age")
validated_data_1.show()

### Output 1: 
+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|   John| 25|   London|
|Matilda| 44| Shanghai|
| Ashley| 21|    Lagos|
|   Mike| 19|Doncaster|
|Charlie| 53|    Abuja|
|  Brian| 27| New York|
+-------+---+---------+

…and now the second:

# Run 2nd check 
validated_data_2  = IDataValidator.check_people_over_40(df_2, "age")
validated_data_2.show()

## Output 2:
+---------+---+-------+
|     name|age|   city|
+---------+---+-------+
|      Sam| 65| Sydney|
|     Todd| 58|  Paris|
|Michaella| 44|Toronto|
+---------+---+-------+

The logic of each method is abstracted away from the user, while the method itself can be called to validate the data. The user doesn’t need to know “how” the method performs its job, they only need to call the method itself.

Functional programming (FP)🌟

Function programming (FP) is a branch of programming that emphasizes the use of functions and promotes the use of immutable data types.

🔋Benefits of FP with PySpark

  • Immutability 🛡- FP’s emphasis on immutability aligns with Spark’s immutable data structures, reducing state-related issues during parallel computing

  • Legibility 📖- FP promotes the use of independent functions which makes the codebase easier to read and maintain

  • Stateless computing 🌍 - Functions don’t focus on maintaining state so allows PySpark operations to run independently without worrying about side effects

⚠️Challenges of FP with PySpark

  • Steep learning curve 🎢📚- From those coming from an OOP, the principles of FP can be challenging to get around

  • Limited tooling 🧰- There aren’t many libraries that support FP for PySpark, limiting the options for robust data pipelines

  • Resource-intensive 🛠 - FP’s emphasis on immutability means new data is created after each transformation, which increases resource utilization🍔

When should I use FP with PySpark?🤔💭

TLDR: when data transformation jobs need to be done with immutable data 🔒📋

Let’s explore this in more depth:

  1. For running Spark jobs on immutable data 📊💎- FP works best when dealing with immutable data types. Mutating data in a distributed environment can lead to data races, where multiple threads access and process the same data simultaneously, which can introduce unexpected bugs into the data transformation processes. This makes the debugging process difficult because the Spark program may produce outputs which are hard (or impossible) to replicate.

Here’s an example of processing immutable data:

from pyspark.sql import Row
from pyspark.sql import functions as F

# Create dummy data 
data_1 = [
        Row(name='John', age=25, city='London'),
        Row(name='Matilda', age=44, city='Shanghai'),
        Row(name='Ashley', age=21, city='Lagos'),
        Row(name='Mike', age=19, city='Doncaster'),
        Row(name='Charlie', age=53, city='Abuja'),
        Row(name='Brian', age=27, city='New York'),
       ]

# Create a Spark dataframe
df_1 = spark.createDataFrame(data_1)

# Transform data
calculate_dob = F.expr("date_add(current_date(), -cast(age * 365 as int))")
df_2 = df_1.withColumn("date_of_birth", calculate_dob)

Here we create a dataset, data_1, and a Spark dataframe, df_1. To add another column, we create a custom operation using F.expr, and feed it into another operation that creates another dataframe, df_2.

So instead of altering the existing df_1 dataframe, we copy it and add the date_of_birth field to it, and name it df_2. This approach embodies the concept of immutability, reducing the risk of data races occurring and making debugging easier.

This outputs:

# Display data
df_2.show()

### Output: 
+-------+---+---------+-------------+
|   name|age|     city|date_of_birth|
+-------+---+---------+-------------+
|   John| 25|   London|   1998-06-11|
|Matilda| 44| Shanghai|   1979-06-16|
| Ashley| 21|    Lagos|   2002-06-10|
|   Mike| 19|Doncaster|   2004-06-09|
|Charlie| 53|    Abuja|   1970-06-18|
|  Brian| 27| New York|   1996-06-11|
+-------+---+---------+-------------+

As you can see df_2 still maintains the existing data from df_1, but now has the date_of_birth field included this time. So df_1 remains unchanged and the immutability principle is successfully in play.

  1. For building pipelines with predictable outputs 🧩🔍- due to functions being stateless in FP, a function’s output depends entirely on its input. This level of predictability is useful for testing and troubleshooting distributed systems
from pyspark.sql import DataFrame

# Create function to change a column's values to uppercase
def convert_to_uppercase(df: DataFrame, column: str) -> DataFrame:
    return df.withColumn(f"{column}_in_uppercase", F.upper(df[column]))

# Apply operation 
transformed_df_1 = convert_to_uppercase(df_1, "name")

Here we define the convert_to_uppercase function, which takes a dataframe and a column name as inputs, and then returns a new dataframe with the values of the selected column converted to uppercase.

This function does not store any data and therefore does not store any state. In other words, its output is solely dependent on its input parameters.

This convert_to_uppercase function is applied to the df_1 dataframe and the name field. Every time we pass these inputs into convert_to_uppercase, it will always return the same values in uppercases for the name column, making the operation a predictable and reliable one.

This yields the following output:

# Display results
transformed_df_1.show()

### Output:

+-------+---+---------+-----------------+
|   name|age|     city|name_in_uppercase|
+-------+---+---------+-----------------+
|   John| 25|   London|             JOHN|
|Matilda| 44| Shanghai|          MATILDA|
| Ashley| 21|    Lagos|           ASHLEY|
|   Mike| 19|Doncaster|             MIKE|
|Charlie| 53|    Abuja|          CHARLIE|
|  Brian| 27| New York|            BRIAN|
+-------+---+---------+-----------------+
  1. For leveraging composition in building complex dataflows 🔗🔨- function composition enables the outputs of one function to be the inputs of another function, which makes building and reading the source code of pipelines with complex behaviours easier

We’ll take advantage of the transform function in PySpark, a function that allows us to chain multiple transformation operations and apply them to one dataframe

from typing import List
from pyspark.sql import DataFrame

# Create function to sort the dataframe by a list of columns
def sort_dataframe(df: DataFrame, columns: List[str]) -> DataFrame:
    return df.sort(*columns)

# Create a function for filtering by a minimum age value
def filter_by_age(df: DataFrame, min_age: int) -> DataFrame:
    return df.filter(df["age"] > min_age)

# Create lambda functions to apply one-line transformations

apply_case_conversions  =  lambda df: convert_to_uppercase(df, "name")
apply_sorting_logic     =  lambda df: sort_dataframe(df, ["city", "name"])
apply_age_filtering     =  lambda df: filter_by_age(df, 20)

# Use composition via "transform" operaton to transform dataframe

transformed_df_2   = (df_1
                   .transform(apply_case_conversions)
                   .transform(apply_sorting_logic)
                   .transform(apply_age_filtering)
                    )

The transform function is used 3 times to form higher-order functions for the transformation tasks. Let’s break down their applications:

  • transform(apply_case_conversions) - The df_1 dataframe is passed into the apply_case_conversions function to convert the selected field into uppercase, and then the result of this operation is used as the input for the next function

  • transform(apply_sorting_logic) - this function takes the output of the previous operation (i.e. df_1 dataframe with the “name” column in uppercases) and sorts it by the city and name fields. The output is then fed into the next function.

  • transform(apply_age_filtering) - this function takes the preceding operation’s output (i.e. the df_1 dataframe with the case conversion and sorting operations applied) and then filters the rows based on the age column, and returns the final transformed dataframe.

So this composition produces the following results:

# Display results
transformed_df_2.show()

### Output:

+-------+---+--------+-----------------+
|   name|age|    city|name_in_uppercase|
+-------+---+--------+-----------------+
|Charlie| 53|   Abuja|          CHARLIE|
| Ashley| 21|   Lagos|           ASHLEY|
|   John| 25|  London|             JOHN|
|  Brian| 27|New York|            BRIAN|
|Matilda| 44|Shanghai|          MATILDA|
+-------+---+--------+-----------------+

What are SOLID principles?📊

SOLID principles are a collection of design principles that help design software solutions easy to test, read and maintain over their lifecycles.

You can read more on this in the 1st part of this series here.

1. Single responsibility principle (SRP) 📌🧰

The single responsibility principle (SIP) states a module should only change for one reason. Another way of putting it is this: every class, method or function must only have one responsibility.

In the context of OOP, this means every class or method must be separated by responsibilities. In FP, this would be every function separated by responsibilities.

Let’s explore how both are applied differently with PySpark:

A. OOP Application👥📌

class IDatabaseConnector:
    def __init__(self, connection_string):
        self.connection_string = connection_string

    def connect_to_db(self):
        print("Connecting to database...")

    def close_connection_to_db(self):
        print("Closing connection to database...")

db_connector = IDatabaseConnector(connection_string="HOST: local, PORT: 1234")
db_connector.connect_to_db()

Here we create the IDatabaseConnector class to handle the database connection and disconnection (via the connect_to_db and close_connection_to_db methods respectively) - it’s safe to say its sole responsibility revolves around managing the database connection, thus, adhering to the SRP.

### Output:

Connecting to database...

B. FP Application🌟📌

from pyspark.sql.functions import col, when

# Create function for depositing money 
def deposit_money(df, account_number: int, amount: float):
    return df.withColumn('balance', when(col('account_number') == account_number, col('balance') + amount).otherwise(col('balance')))

# Create function for withdrawing money
def withdraw_money(df, account_number: int, amount: float):
    if df.filter((col('account_number') == account_number) & (col('balance') < amount)).count() > 0:
        raise ValueError("Unfortunately your balance is insufficient for any withdrawals right now...")
    return df.withColumn('balance', when(col('account_number') == account_number, col('balance') - amount).otherwise(col('balance')))

# Create function for printing bank balance
def print_balance(df):
    return df.select(col('account_number'), col('balance'))

# Create function for changing account details
def change_account_number(df, current_account_number: int, new_account_number: int):
    return df.withColumn('account_number', when(col('account_number') == current_account_number, new_account_number).otherwise(col('account_number')))

# Create Spark dataframe with dummy data
bank_data = [{'account_number': 12345678, 'balance': 540.00}]
starting_df = spark.createDataFrame(bank_data)

This example contains multiple functions with distinct responsibilities around managing activities linked to a bank account, like depositing money (deposit_money), withdrawing money (withdraw_money), printing the balance (print_balance) and changing account numbers (change_account_number).

# Deposit money 
df_1 = deposit_money(starting_df, account_number=12345678, amount=50)
my_current_account_1 = print_balance(df_1)
my_current_account_1.show()

## Output 1:
+--------------+-------+
|account_number|balance|
+--------------+-------+
|      12345678|  590.0|
+--------------+-------+

# Withdraw money
df_2 = withdraw_money(df_1, account_number=12345678, amount=500)
my_current_account_2 = print_balance(df_2)
my_current_account_2.show()

### Output 2:
+--------------+-------+
|account_number|balance|
+--------------+-------+
|      12345678|   90.0|
+--------------+-------+

Each function is independent and has a single responsibility, making this example consistent with SRP from a functional programming perspective.

2. Open-close principle (OCP)🚪🔐

This principle states that a module should be open for extension but closed for modification.

In other words, your code should be written in such a way that new functionality can be added without requiring changes to the existing code.

In OOP, this applies to classes and methods, while FP applies this to functions only. Isolating the classes, methods and functions in an appropriate manner improves the maintainability and testability of your codebase.

A. OOP Application👥📌

class IFileProcessor:
    def read_file(self, file: str):
        print(f"Reading {file} file into system... ")

    def check_file_details(self, file: str, row_count: int):
        print(f"File name: {file},  No of rows: {row_count} ")



file_processor = IFileProcessor()
file_processor.check_file_details(file="Test File", row_count=3)

From an OOP perspective, the IFileProcessor interface in this example adheres to the OCP because we can add more features without modifying the existing code within the class. If we wanted to add a load_file method to the class, this can be achieved without interrupting the code structure of the read_file or check_file_details methods.

# Output:

File name: Test File,  No of rows: 3

B. FP Application🌟📌

from typing import Callable 
from pyspark.sql.functions import col

# Create the dummy data
data = [{'object': 'A', 'sensor': 'temperature'},
        {'object': 'B', 'sensor': 'ultrasonic'},
        {'object': 'C', 'sensor': 'infrared'} 
       ]

# Create the Spark dataframe
df = spark.createDataFrame(data)

# Create higher-order function that receives different sensors
def detect_with_sensor(df, *sensors: Callable) -> None:
    for i, sensor in enumerate(sensors):
        print(f'Sensor {i + 1} ')
        sensor(df)

# Express sensors as functions
def use_temperature_sensor(df) -> None:
    print("Detecting objects using temperature sensor...")
    df.filter(col('sensor') =='temperature').show()

def use_ultrasonic_sensor(df) -> None:
    print("Detecting objects using ultrasonic sensor...")
    df.filter(col('sensor')=='ultrasonic').show()

def use_infrared_sensor(df) -> None:
    print("Detecting objects using infrared sensor...")
    df.filter(col('sensor')=='infrared').show()

The detect_with_sensor function aligns with the OCP from a functional programming standpoint. This is because it can accept different sensors as input without any changes to the detect_with_sensor function itself.

Let’s pass the functions into the detect_with_sensor function and observe the results:

# Detect the objects using different sensors
detect_with_sensor(df, use_infrared_sensor, use_ultrasonic_sensor, use_temperature_sensor)

### Output:

Sensor 1 
Detecting objects using infrared sensor...
+------+--------+
|object|  sensor|
+------+--------+
|     C|infrared|
+------+--------+

Sensor 2 
Detecting objects using ultrasonic sensor...
+------+----------+
|object|    sensor|
+------+----------+
|     B|ultrasonic|
+------+----------+

Sensor 3 
Detecting objects using temperature sensor...
+------+-----------+
|object|     sensor|
+------+-----------+
|     A|temperature|
+------+-----------+

We can add as many sensors as required without modifying the existing code, so therefore it is open for extension and closed for modification.

3. Liskov substitution principle (LSP)👩‍🔬🔄

The Liskov substitution principle (LSP) states that a derived module should be able to replace a base module without any unexpected behaviour.

The OOP interpretation of this rule states that a parent class should be able to be exchanged with this child class without any unexpected issues occurring in/from the code. The FP interpretation states that the initial function’s signature (input-and output types) should match the substitutional function’s signature without unexpected behaviour occurring also.

A. OOP Application👥📌

class EnvConfig(ABC):
    @abstractmethod
    def get_config(self, object_name, no_of_objects):
        pass


class MongoDBConfig(EnvConfig):
    def get_config(self, object_name, no_of_objects):
        print(f"MONGODB - Database Name: {object_name},  No of tables: {no_of_objects} ")



class AzureBlobConfig(EnvConfig):
    def get_config(self, object_name, no_of_objects):
        print(f"AZURE BLOB - Blob Name: {object_name}, No of blob containers: {no_of_objects} ")

mongodb_config      =  MongoDBConfig()
azure_blob_config   =  AzureBlobConfig()

mongodb_config.get_config(object_name="dummy_db",no_of_objects=3)
azure_blob_config.get_config(object_name="dummy_blob", no_of_objects=16)

The EnvConfig class is the base class (or parent class), while the MongoDBConfig and AzureBlobConfig are the derived classes (or child classes). The derived classes in this example accept the same number of input parameters into the get_config method, which both the base class and the derived classes also share.

This means we shouldn’t see any new exceptions if we substitute any derived class wherever the EnvConfig class is expected to operate, and therefore satisfy the LSP in this example.

### Output:

MONGODB - Database Name: dummy_db,  No of tables: 3 
AZURE BLOB - Blob Name: dummy_blob, No of blob containers: 16

B. FP Application🌟📌

from typing import Callable
from pyspark.sql.functions import col, when

# Create function for using appliances with temperature controls
def use_temperature_controlled_item(df, 
                                    item: str, 
                                    turn_on: Callable[[str], None], 
                                    turn_off: Callable[[str], None], 
                                    change_temperature: Callable[[str], None]) -> None:
    df = turn_on(df, item)
    df = turn_off(df, item)
    df = change_temperature(df, item)

    return df

# Create function for turning refridgerator on
def turn_on_fridge(df, item: str) -> None:
    print(f"{item} turned on.")
    return (df.withColumn('status', 
                          when(col('item')==item, 'on')
                          .otherwise(col('status')
                                    )))

# Create function for turning refridgerator off
def turn_off_fridge(df, item: str) -> None:
    print(f"{item} turned off.")
    return (df.withColumn('status', 
                          when(col('item')==item, 'off')
                          .otherwise(col('status')
                                    )))

# Create function for changing refridgerator temperature
def change_temperature_of_fridge(df, item:str) -> None:
    print(f"{item} temperature changed.")
    return (df.withColumn('temperature', 
                         when(col('item')==item, col('temperature') + 1)
                         .otherwise(col('temperature')
                                   )))

# Create Spark dataframe
data = [{'item': 'fridge', 'status': 'off', 'temperature': 4}]
starting_df = spark.createDataFrame(data)

starting_df.show()

### Output: 
+------+------+-----------+
|  item|status|temperature|
+------+------+-----------+
|fridge|   off|          4|
+------+------+-----------+

From a FP perspective, aligning with LSP means ensuring the original function’s input-output signature is identical to the substitute function’s input-output so that no unexpected behaviour is returned to the user.

In this case, the use_temperature_controlled_item function takes in three inputs - turn_on, turn_off and change_temperature, in which each function accepts a Spark DataFrame and a string as input and returns another Spark DataFrame as its output. We also have three standalone functions for operating the refrigerator with - turn_on_fridge, turn_off_fridge and change_temperature_of_fridge, which also accepts a Spark Dataframe and a string and outputs another DataFrame too.

This means these three functions can be substituted for the use_temperature_controlled_item function without any errors returning to the user, as demonstrated below:

# Change the state of the refridgerator 
change_of_state_df = use_temperature_controlled_item(starting_df, 'fridge', turn_on_fridge, turn_off_fridge, change_temperature_of_fridge)
change_of_state_df.show()

### Output:

fridge turned on.
fridge temperature changed.
fridge turned off.
+------+------+-----------+
|  item|status|temperature|
+------+------+-----------+
|fridge|   off|          5|
+------+------+-----------+

4. Interface segregation principle (ISP)🧩⛔

The interface segregation principle (ISP) states that a module shouldn't be forced to adopt functionalities it isn’t designed or expected to use.

In OOP, this principle is violated if a class contains methods its subclass doesn't need or may not make real-world sense to use. In FP, the principle is violated if a function is forced to depend on a function it doesn’t need to make it work.

A. OOP Application👥📌

class CloudUploader:
    @abstractmethod
    def upload_to_cloud(self):
        pass


class DBUploader:
    @abstractmethod
    def upload_to_db(self):
        pass


class S3DataUploader(CloudUploader):
    def upload_to_cloud(self):
        print("Uploading files to S3 bucket...")


class PostgresDataUploader(DBUploader):
    def upload_to_db(self):
        print("Uploading tables to Postgres database...")

s3_uploader = S3DataUploader()
s3_uploader.upload_to_cloud()

postgres_uploader = PostgresDataUploader()
postgres_uploader.upload_to_db()

This PySpark code follows the ISP by maintaining separate interfaces for CloudUploader and DBUploader. This means any class that uses these interfaces are not forced to use methods they do not require. For instance, the S3DataUploader class only needs to load files into an S3 bucket via the upload_to_cloud, and the PostgresDataUploader class only needs to load data into a Postgres database through the upload_to_db method.

So implementing these objects correctly would return this output:

### Output:

Uploading files to S3 bucket...
Uploading tables to Postgres database...

We’ve managed to segregate the functions so that they do not interfere with each other. In other words, the separation of concerns in this example is evident because each class is doing what they need to do while adhering to ISP from an OOP standpoint.

B. FP Application🌟📌

from typing import Callable
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, when

# Create function for applying transformation functions 
def apply_transformation_to_movie_data(transform_function: Callable[ [DataFrame], DataFrame ], df: DataFrame) -> DataFrame:
    return transform_function(df)

# Create Spark Session
spark = SparkSession.builder.appName("FP Application of ISP with Spark").getOrCreate()

# Create Spark dataframe
movie_data = [('Inception', 2010),
              ('Rush Hour', 1998),
              ('Avengers, Endgame', 2019),
              ('Bad Boyz', 1995),
              ('John Wick', 2014)
             ]

movie_df_1 = spark.createDataFrame(movie_data, ["title", "year"])

# Create function for adding ratings to movies 
def add_rating(df: DataFrame) -> DataFrame:
    return (df.withColumn("rating", when(col("title") == "Inception", 8.8)
                                   .when(col("title") == "Rush Hour", 7.0)
                                   .when(col("title") == "Avengers: Endgame", 8.4)
                                   .when(col("title") == "Bad Boyz", 6.9)
                                   .otherwise(7.4)
                         ))

# Create function for adding awards to movies 
def add_awards(df: DataFrame) -> DataFrame:
    return (df.withColumn("awards", when(col("title") == "Inception", 4)
                                   .when(col("title") == "Rush Hour", 0)
                                   .when(col("title") == "Avengers: Endgame", 0)
                                   .when(col("title") == "Bad Boyz", 0)
                                   .otherwise(0)
                         ))

In functional programming, ISP is achieved by using pure functions that each perform a single task. The apply_transformation_to_movie_data function accepts a function that performs a transformation activity on a Spark dataframe. The add_rating and add_awards functions are both functions that perform two separate tasks. We pass both through the apply_transformation_to_movie_data function at different times without forcing any transformation functions to use other operations they do not use.

# Display results
movie_df_2 = apply_transformation_to_movie_data(add_rating, movie_df_1)
movie_df_2.show()

### Output 1:
+-----------------+----+------+
|            title|year|rating|
+-----------------+----+------+
|        Inception|2010|   8.8|
|        Rush Hour|1998|   7.0|
|Avengers, Endgame|2019|   7.4|
|         Bad Boyz|1995|   6.9|
|        John Wick|2014|   7.4|
+-----------------+----+------+

movie_df_3 = apply_transformation_to_movie_data(add_awards, movie_df_2)
movie_df_3.show()

### Output 2:
+-----------------+----+------+------+
|            title|year|rating|awards|
+-----------------+----+------+------+
|        Inception|2010|   8.8|     4|
|        Rush Hour|1998|   7.0|     0|
|Avengers, Endgame|2019|   7.4|     0|
|         Bad Boyz|1995|   6.9|     0|
|        John Wick|2014|   7.4|     0|
+-----------------+----+------+------+

5. Dependency inversion principle (DIP)📲 🧲

The dependency inversion principle (DIP) states that high-level modules should not depend on low-level modules, and both should depend on abstractions only.

OOP’s interpretation of this states that high-level and low-level modules ( concrete classes) should only depend on abstract classes. FP’s interpretation states that high-level and low-level modules (functions and their internal behaviours) should only depend on abstractions (input functions passed into other functions) i.e. dependency injections.

By making the modules depend on abstract implementations instead of concrete ones, this principle increases the level of loose coupling in the program’s code, making it easier to extend the program’s functionality without modifying the existing code.

A. OOP Application👥📌

class DataProcessor(ABC):
    @abstractmethod
    def read_data(self):
        pass



class CSVProcessor(DataProcessor):
    def __init__(self, spark: SparkSession, file_path: str):
        self.spark = spark 
        self.file_path = file_path 

    def read_data(self):
        print(f"Reading '{self.file_path}' CSV file into program ... ")
        return self.spark.read.csv(self.file_path)


class DeltaProcessor(DataProcessor):
    def __init__(self, spark: SparkSession, file_path: str):
        self.spark = spark 
        self.file_path = file_path 


    def read_data(self):
        print(f"Reading '{self.file_path}' delta file into program ... ")
        return self.spark.read.delta(self.file_path)

spark = SparkSession.builder.appName("Another Test App for SDW's blog").getOrCreate()

csv_processor = CSVProcessor(spark, "test_location/test_file.csv")
delta_processor = DeltaProcessor(spark, "test_location/test_file.delta")

csv_processor.read_data()
delta_processor.read_data()

This example is aligned with the DIP because the CSVProcessor and DeltaProcessor class are both low-level modules that depend on the DataProcessor, which is an abstract class. By having an abstraction created, the DataProcessor allows other concrete classes to be implemented from it in a similar way the CSVProcessor and DeltaProcessor class were created.

### Output:

Reading 'test_location/test_file.csv' CSV file into program ...
Reading 'test_location/test_file.delta' delta file into program ...

B. FP Application

from typing import Callable
from pyspark.sql import DataFrame, SparkSession, Row
from pyspark.sql.functions import col, when, lit, round
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Specify food schema

food_schema = StructType([
    StructField("food_type", StringType(), True),
    StructField("calories", IntegerType(), True),
    StructField("ratings", DoubleType(), True),
    StructField("price", DoubleType(), True)

])

# Create Spark Session
spark = SparkSession.builder.appName("FP App of DIP").getOrCreate()

# Create Spark dataframe
food_data = [Row(food_type="Chips", calories=50,    ratings=6.5, price=3.99),
             Row(food_type="Salad", calories=100,   ratings=4.0,   price=5.99),
             Row(food_type="Burger", calories= 200, ratings=8.5, price=7.99),
             Row(food_type="Soup",  calories= 130, ratings=7.0, price=6.99)
            ]

food_df = spark.createDataFrame(food_data, food_schema)

# Create function for renaming columns 
def rename_columns(df: DataFrame) -> DataFrame:
    return (df.withColumnRenamed("food_type", "item")
              .withColumnRenamed("calories", "calories_in_kcal")
              .withColumnRenamed("ratings", "customer_ratings")
           )

# Create function for adding food categories
def add_food_category(df: DataFrame) -> DataFrame:
    return (df.withColumn("food_category", F.when(F.col("item").isin(["Pizza", "Burger", "Hotdog"]), "Fast Food")
                          .when(F.col("item").isin(["Soup", "Salad"]), "Healthy")
                          .otherwise("Others")
                         ))

# Create function for adding vegetarian flag 
def add_vegetarian(df: DataFrame) -> DataFrame:
    return (df.withColumn("is_vegetarian", 
                          F.when(F.col("item").isin(["Salad", "Soup"]), True)
                          .otherwise(False)
                         ))

# Execute transformations
transformed_food_df = (food_df
                       .transform(rename_columns)
                       .transform(add_food_category)
                       .transform(add_vegetarian)
                      )

In this example, the abstractions are the functions that are passed into each transform function applied to the initial food_df dataframe (i.e. the rename_columns, add_food_category and add_vegetarian functions), where the high-level module is considered the entire pipeline generating the transformed_food_df dataframe, and the low-level modules would be the implemented logic within each of the abstract functions mentioned, making this functional programming example in harmony with the DIP.

# Display results
transformed_food_df.show()

### Output: 
+------+----------------+----------------+-----+-------------+-------------+
|  item|calories_in_kcal|customer_ratings|price|food_category|is_vegetarian|
+------+----------------+----------------+-----+-------------+-------------+
| Chips|              50|             6.5| 3.99|       Others|        false|
| Salad|             100|             4.0| 5.99|      Healthy|         true|
|Burger|             200|             8.5| 7.99|    Fast Food|        false|
|  Soup|             130|             7.0| 6.99|      Healthy|         true|
+------+----------------+----------------+-----+-------------+-------------+

Why PySpark is more suited for functional programming (FP) than OOP📊🥊

PySpark favours the functional programming (FP) paradigm more than OOP. Here are some of the reasons why:

  • Immutability 🛡️- As mentioned earlier, PySpark deals more with immutable data types, which is one of the key tenets of functional programming

  • Parallelism 🧩- FP reduces the chances of side effects occurring, which makes it easy to split tasks across different machines

  • Stateless computing 🌍- PySpark operations can run independently with FP because it does not prioritize managing state as OOP does

Can SOLID principles still be applied to PySpark?🧐🔩

Absolutely! While this article appears to imply PySpark is more favourable for FP use cases, this doesn’t mean PySpark cannot reap the benefits SOLID principles can bring, even through the OOP paradigm.

It simply means engineers need to use their professional judgement to assess their project needs and incorporate the principles as appropriate, as each project would require different paradigms to guide their development lifecycles

For instance, if a project needs state management, OOP would be the more suitable choice. However, if the project prioritizes distributed computing with immutable data structures, FP would be the more viable option.

Conclusion🏁

Integrating every SOLID principle into every project is easier said than done, so it is important to understand when to prioritize certain principles over others, or incorporate a few and leave the rest. The key lies with the architects and engineers driving the data engineering tasks of the project.

Feel free to reach out via my handles: LinkedIn| Email | Twitter