When dealing with complex data transformation logic, the key is to break it down into small manageable and testable functional units, this ensures clarity and ease of maintenance throughout your project.
The Spark Dataframe API offers a seamless way to manipulate structured data. One particularly handy method within this API is .transform()
, which allows for concise chaining of custom transformations, thereby facilitating complex data processing pipelines.
In this blog, we’ll embark on a journey to understand the bits and pieces of transformation chains using PySpark, starting from simple transformations and gradually delving into more advanced scenarios.
The data
Let’s consider a Spark DataFrame containing information about individuals including their name, age, and email address.
# Create a DataFrame
data = [
("François Dupont", 20, "francoisdp@wanadoo.com"),
("Zinedine Zidane", 25, "zinedinezidane@goat.com"),
("Napoleon Bonaparte", 30, "napoleonbonaparte@laposte.com"),
("Louise Parker", 18, "louiseparker@yahoo.com")
]
columns = ["name", "age", "email"]
df = spark.createDataFrame(data, columns)
The goal is to perform several transformations on this DataFrame:
- Add a column indicating whether an individual is older than 21, named “is_adult”.
- Extract the first name and last name from the “name” column.
- Extract the domain from the email addresses.
Escaping the Spaghetti Code Nightmare
Let’s begin by examining a spaghetti code example that performs various transformations in Spark without any structure or organization:
from pyspark.sql.functions import col, split, substring_index, when
# Add a column indicating whether an individual is older than 21, named "is_adult"
df_spaghetti_0 = df.withColumn("is_adult", when(col("age") > 21, True).otherwise(False)) \
.select("name", "age", "email")
# Extract the first name and last name from the "name" column
df_spaghetti_1 = df_spaghetti_0.withColumn("first_name", split(col("name"), " ")[0]) \
.withColumn("last_name", split(col("name"), " ")[1]) \
.drop("name")
# Extract the domain from the email addresses
df_spaghetti_2 = df_spaghetti_1.withColumn("domain", substring_index(split(col("email"), "@")[1], ".", 1)) \
.drop("email")
This code is difficult to understand and maintain due to its lack of structure and organization. We’ll now refactor it step by step to improve its readability and modularity.
Refactoring with Function Decomposition
Our first step in refactoring the code is to break down the DataFrame transformations into separate functions. This allows us to isolate each transformation and make the code more modular. Here’s how we can refactor the code using function decomposition:
from pyspark.sql import DataFrame
def add_is_adult_column(df: DataFrame) -> DataFrame:
"""
Add a column indicating whether an individual is older than 21, named "is_adult".
"""
return df.withColumn("is_adult", when(col("age") > 21, True).otherwise(False))
def extract_first_and_last_name(df: DataFrame) -> DataFrame:
"""
Extract the first name and last name from the "name" column.
"""
return df.withColumn("first_name", split(col("name"), " ")[0]) \
.withColumn("last_name", split(col("name"), " ")[1]) \
.drop("name")
def extract_domain(df: DataFrame) -> DataFrame:
"""
Extract the domain from the email addresses.
"""
return df.withColumn("domain", substring_index(split(col("email"), "@")[1], ".", 1)) \
.drop("email")
Let’s now use .transform()
to chain our transformations:
df_transformed = df \
.transform(add_is_adult_column) \
.transform(extract_first_and_last_name) \
.transform(extract_domain)
Now, we’ve created a clear and readable pipeline for processing our DataFrame. Each transformation function is applied sequentially, resulting in a streamlined and understandable flow of operations.
Parameterization with Currying
Finally, let’s enhance our transformations with parameterization, enabling us to customize their behavior based on input parameters.
Imagine we need to parameterize the age threshold for determining adulthood:
def add_is_adult_column(df: DataFrame, threshold: int) -> DataFrame:
"""
Add a column indicating whether an individual is older than a threshold, named "is_adult".
"""
return df.withColumn("is_adult", when(col("age") > threshold, True).otherwise(False))
However, in order for us to use it within a .transform()
, the function must take a DataFrame as argument (whereas in our case we added a treshold).
Currying is a technique in functional programming that allows to transform a function that takes multiple arguments into a sequence of functions that each take one argument, we will use that to make sure whatever is passed to the .transform()
is a function with DataFrame input and output:
from typing import Callable
def add_is_adult_column(threshold: int) -> Callable:
"""
Return a callable function to add a column indicating whether an individual is older than the given threshold, named "is_adult".
"""
def inner(df: DataFrame) -> DataFrame:
return df.withColumn("is_adult", when(col("age") > threshold, True).otherwise(False))
return inner
Now we can use it within the .transform()
:
df_transformed = df.transform(add_is_adult_column(threshold=21)) \
.transform(extract_first_and_last_name) \
.transform(extract_domain)
Each transformation function can now be customized with specific parameters, making our data processing pipeline more versatile.
Conclusion
In this blog post, we’ve demonstrated how to enhance Spark DataFrame transformations using functional programming concepts in Python. By refactoring our code with function decomposition, parameterization, and currying, we’ve transformed a messy and unreadable data processing pipeline into a clean, modular and versatile solution.
By adopting these techniques, you can improve the maintainability, scalability, and readability of your Spark applications, making them easier to understand and maintain in the long run.
Happy coding!