After spending countless hours working with Spark, I’ve compiled some tips and tricks that have helped me improve my productivity and performance. Today, I’ll share some of my favorite ones with you.
1. Measuring DataFrame Size in Memory
When working with Spark, knowing how much memory your DataFrame uses is crucial for optimization. This is especially important since compressed formats like Parquet can make memory usage unpredictable - a small file on disk might expand significantly when loaded into memory.
Tracking memory usage helps prevent out-of-memory errors, enables accurate cluster sizing, and informs caching decisions. Moreover, when your Spark job is failing or running slower than expected, memory metrics can help identify issues and guide you towards appropriate solutions.
To measure your DataFrame’s memory usage, follow these steps:
- First, cache your DataFrame in memory:
df.cache()
- Run an action (like
count()
) to materialize the cache:
df.count()
- Open the Spark UI and navigate to the “Storage” tab.
Here you’ll find the memory usage of the DataFrame (Size in Memory) but also detailed information about:- Storage level: in memory, disk, etc.
- Cached partitions: number of partitions that are cached
- Fraction cached: percentage of data that is cached
- Size on disk: total disk space used by the DataFrame
If you only need to measure the memory usage of a DataFrame and/or you want to do it programmtically, you can do it like this:
size_bytes = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(df._jdf)
This method uses the SizeEstimator
class from the Spark JVM to estimate the size of the DataFrame in bytes, which you can then convert to a more readable format (e.g., MiB, GiB).
2. NOOP Write Operations for Testing
Sometimes you need to test your Spark pipeline without actually writing data to disk. This is particularly useful when:
- Benchmarking transformation performance
- Debugging complex pipelines
- Validating execution plans
The “noop” (no operation) format is perfect for these scenarios. It triggers the computation of the entire transformation pipeline (select, join, groupBy, agg) without writing any output:
# Your transformation pipeline
transformed_df = (
df.select("col1", "col2")
.join(other_df, "key")
.groupBy("col1")
.agg(F.sum("value").alias("total"))
)
# Trigger computation without writing
transformed_df.write.format("noop").mode("overwrite").save()
A quick look to the Spark UI will show you that the transformations are executed but no files are written to disk.
3. Handling Duplicate Column Names in Joins
This tip is particularly special to me as I discovered it in my very early days while reading the famous “Spark: The Definitive Guide” by Bill Chambers and Matei Zaharia. The first time I encountered that dreaded “ambiguous column” error in a join, I immediately knew where to look and how to fix it.
When joining DataFrames, you might encounter duplicate column names in two common scenarios:
- The join key appears in both DataFrames with the same name
- Non-join columns have the same name in both DataFrames
I’m only going to focus on the first scenario as it’s the most common in my opinion.
Let’s see this problematic situation in practice:
df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "col1"])
df2 = spark.createDataFrame([(1, "X"), (2, "Y")], ["id", "col2"])
# Join using a boolean condition
joined_df = df1.join(df2, df1.id == df2.id)
joined_df.printSchema()
# Output:
# root
# |-- id: long (nullable = true)
# |-- col1: string (nullable = true)
# |-- id: long (nullable = true)
# |-- col2: string (nullable = true)
Note that there are two columns named id
in the resulting DataFrame, even if we joined on that column.
The error is raised when we try to select the id
column:
joined_df.select("id").show()
# Error:
# AnalysisException: [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].
My favorite solution is to use a string join expression instead of a boolean condition. This automatically removes the duplicate column:
# Clean and simple solution
safe_joined_df = df1.join(df2, "id")
safe_joined_df.printSchema()
# Output:
# root
# |-- id: long (nullable = true)
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
So now we can select the id
column without any issue:
safe_joined_df.select("id").show()
# Output:
# +---+
# | id|
# +---+
# | 1|
# | 2|
# +---+
4. Parameterizing Transformations: The Simple Way
In my last job, we were working with an older version of Spark in an on-premise environment. Like many data teams stuck with legacy systems, we had our ways of doing things - including using currying for DataFrame transformations.
I was so comfortable with this pattern that I wrote a blog post about Mastering chained transformations in Spark where I put currying (a technique where a function that takes multiple arguments is transformed into a chain of single-argument functions - more details here) as the elegant solution for parameterizable transformations.
Let me share this evolution through a real example. Here’s what I originally advocated for using currying:
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.sql.functions import when, col
def add_is_adult_column(threshold: int) -> Callable:
"""
Return a callable function to add a column indicating whether an individual
is older than the given threshold, named "is_adult".
"""
def inner(df: DataFrame) -> DataFrame:
return df.withColumn(
"is_adult",
when(col("age") > threshold, True).otherwise(False)
)
return inner
# Usage with currying
df_transformed = df.transform(add_is_adult_column(threshold=21))
But since Spark 3.3.0, we can now use the kwargs
parameter in the transform
method to pass parameters directly to the transformation function. This makes the code cleaner and easier to understand:
def add_is_adult_column(df: DataFrame, threshold: int) -> DataFrame:
"""
Add a column indicating whether an individual is older than a threshold.
"""
return df.withColumn(
"is_adult",
when(col("age") > threshold, True).otherwise(False)
)
# Usage with kwargs - the simpler way
df_transformed = df.transform(add_is_adult_column, threshold=21)
ET VOILA! 🎩
5. Using Decorators to Check DataFrame Column Types
One of the most common sources of runtime errors in Spark applications is mismatched column types. These issues can be particularly frustrating because they might only surface after processing significant amounts of data. Here’s a helpful decorator pattern I use to validate DataFrame schemas before executing transformations:
from functools import wraps
from typing import Dict, List
from pyspark.sql import DataFrame
from pyspark.sql.types import DataType, StructField
def validate_columns(required_columns: Dict[str, DataType]):
"""
Decorator to validate DataFrame columns and their types.
Args:
required_columns: Dictionary of column names and their expected types
"""
def decorator(func):
@wraps(func)
def wrapper(df: DataFrame, *args, **kwargs):
# Get actual schema as dictionary
actual_schema = {field.name: field.dataType for field in df.schema}
# Check for missing columns
missing_cols = set(required_columns.keys()) - set(actual_schema.keys())
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Check column types
type_mismatches = []
for col_name, expected_type in required_columns.items():
if actual_schema[col_name] != expected_type:
type_mismatches.append(
f"{col_name}: expected {expected_type}, got {actual_schema[col_name]}"
)
if type_mismatches:
raise TypeError(f"Column type mismatches: {', '.join(type_mismatches)}")
return func(df, *args, **kwargs)
return wrapper
return decorator
# Usage example
from pyspark.sql.types import IntegerType, StringType
@validate_columns({
"age": IntegerType(),
"name": StringType()
})
def process_user_data(df: DataFrame) -> DataFrame:
return df.withColumn("is_adult", F.col("age") >= 18)
# This will raise an error if the DataFrame doesn't have the correct schema
result_df = process_user_data(users_df)
This acts as a simple sanity check to make sure that the DataFrame has the correct schema before processing it.
Conclusion
These patterns have helped me write more maintainable Spark code, and I’m sharing them hoping they might be useful to others.
If you have other patterns or approaches you’ve found valuable, I’d love to hear about them. Happy Sparking! 🚀