Adaptive Query Execution (AQE) is a groundbreaking feature introduced in Spark 3.0 that dynamically optimizes query performance at runtime. By utilizing real-time statistics, AQE can adjust query plans based on the actual data characteristics encountered during execution, leading to more efficient and faster query processing.

In this blog, I will explore the practical applications of AQE, demonstrating its benefits and capabilities. To illustrate these concepts, I will use Microsoft Fabric notebooks running on runtime 1.2 (Spark 3.4) within a single-node cluster configuration with 8 executors.

Through these examples, you will gain a deeper understanding of how AQE can significantly enhance your Spark query performance.

Definitions

Let’s start by defining key terms that will be referenced throughout this blog:

Partition: Small segment of data from a larger dataset. It’s the basic unit of parallelism that allows Spark to distribute processing tasks across multiple nodes in a cluster. Each partition is processed independently by a single task.

Stage: Set of tasks that perform a specific computation. Stages are separated by operations that require data shuffling across the network, like aggregations or joins.

Data Skew: Occurs when some partitions hold significantly more data than others.

AQE Framework and Workflow

AQE operates by inserting optimization steps at key points in the query execution flow, particularly at shuffle boundaries (ie. end of a stage), which naturally break the pipeline and materialize intermediate results. By doing so, AQE can gather runtime statistics and adjust the query plan accordingly without disrupting the overall execution flow.

🚙 Route Optimization for SQL Queries
Let’s visualize how AQE works with an analogy: imagine you’re embarking on a road trip. At each new destination, you monitor traffic conditions and carefully choose your route based on the information you’ve collected, always aiming to reach your destination as efficiently as possible. Well, similarly AQE will gather information and fine-tunes its execution plan at every stage of query execution to ensure optimal performance!

Here’s a high-level overview of AQE’s workflow:

AQE Workflow

Practically, this is the step-by-step process that occurs:

  1. Kick-off Leaf Stages: Start executing stages that do not depend on the input of other stages.
  2. Perform Optimizations: Upon stage completion, gather runtime statistics and re-optimize the query plan if necessary.
  3. Start Dependent Stages: Trigger execution of stages that now have their dependencies resolved.
  4. Repeat: Continue this cycle until all stages are complete.

Major Optimizations in AQE

Adaptive Query Execution (AQE) introduces three key optimizations:

  • Dynamic Coalescing of Shuffle Partitions
  • Dynamic Join Strategy Switching
  • Dynamic Skew Join Handling

I will illustrate these mechanisms using Spark SQL/DataFrame API and examine the Spark UI to understand what happens behind the scenes. I’ll be using the following artificial datasets:

%%sql
-- Create "products" table.
CREATE TABLE products
USING parquet
AS
SELECT 
id AS p_product_id
,CAST(rand() * 2000 AS INT) AS p_cost
FROM RANGE(30000000);

-- Create "transactions" table with skew.
-- 80% of all transactions involve the product with id 200.
CREATE TABLE transactions
USING parquet
AS
SELECT 
CASE 
    WHEN rand() < 0.8 THEN 200 
    ELSE CAST(rand() * 30000000 AS INT) 
END AS t_product_id
,CAST(rand() * 200 AS INT) AS t_quantity
,DATE_ADD(current_date(), - CAST(rand() * 500 AS INT)) AS t_date
FROM RANGE(1000000000);

Last but not least, let’s enable AQE in case it’s not already:

spark.conf.set("spark.sql.adaptive.enabled", "true")

1. Dynamic Coalescing of Shuffle Partitions

A balancing act

When Spark performs an operation requiring data movement across nodes (for example aggregation), it carries out a cross-network shuffle. As a result, data is redistributed across the different nodes of the cluster into a fixed number of partitions.  

This number of partitions is such an important parameter as it directly impacts query performance:

  • Over-partitioning leads to small partitions, causing excessive I/O and high overhead.
  • Under-partitioning results in large partitions that may cause memory pressure and disk spilling.

Before AQE, Spark used a static number of shuffle partitions throughout the entire query execution, by default set to 200. This parameter would be adjusted based on factors such as the size of the cluster or the data being processed using the following configuration: spark.conf.set("spark.sql.shuffle.partitions", 50).

⛔ Having a static number of shuffle partitions leads to suboptimal performances because data size changes throughout query execution

How Dynamic Coalescing Works

AQE dynamically adjusts the number of shuffle partitions based on real-time statistics gathered during query execution. This is done by merging (coalescing) small partitions into bigger partitions and ensures that partition sizes are optimized for the next stage of the query.

🛠️ Example

Let’s see how this works in practice, consider the following query:

# Calculate the total quantity of product sold on each date & order by quantity
df = spark.sql("""
 SELECT 
 t_date
 ,SUM(t_quantity) AS total_quantity
 FROM transactions
 GROUP BY t_date
 ORDER BY total_quantity DESC
""")

display(df)

By examining the Spark UI, we can observe the following:

  1. Specifies 200 shuffle partitions as it’s the default setting.
  2. Runtime statistics show an average partition size of 15.7 KiB (too small).
  3. Coalesces (merge) partitions into one larger partition.

AQE Spark UI coalesce

2. Dynamic Join Strategy Switching

How Spark performs joins ?

Spark joins data using either shuffle joins or broadcast join strategy, and I’m simplifying a lot here. In a shuffle join strategy, used for big-to-big table joins, all nodes share data with each other based on the join keys, which can be slow and network-heavy. For big-to-small table joins, Spark can use a broadcast join strategy, where the small table is copied to all worker nodes before the join. This allows each node to perform the join on its own without further communication, making the process quicker and more efficient.

AQE for Join Optimization

AQE enhances join performance by dynamically adjusting execution strategies based on real-time metrics. Initially, Spark might choose a join strategy based on estimates, but AQE refines this during execution by analyzing actual data statistics. For instance, if Spark initially selects a shuffle join for a big-to-big table join but detects that one table is smaller than expected (below spark.sql.autoBroadcastJoinThreshold), it can switch to a broadcast join mid-execution.

🛠️ Example

Practice time! Let’s consider the following query:

# Calculate the total sales amount for each product under 10 sold on each date
df = spark.sql("""
 SELECT
 t.t_date AS sales_date
 ,p.p_product_id
 ,SUM(t.t_quantity * p.p_cost) AS total_sales
 FROM transactions t

 JOIN products p 
 ON t.t_product_id = p.p_product_id
 WHERE p.p_cost < 10
 GROUP BY t.t_date, p.p_product_id;
""")

See what I did there? The filter on the price p.p_cost < 10 will reduce the size of the products table, making it a big-to-small table join.  

However, this is not known in the static planning so the initial plan opts for a SortMergeJoin (9) (which is a type of shuffle join - the expensive one), as you can see in the breakdown of the query execution:

df.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (12)
+- HashAggregate (11)
 +- Project (10)
 +- SortMergeJoin Inner (9)
 :- Sort (4)
 :  +- Exchange (3)
 :     +- Filter (2)
 :        +- Scan parquet spark_catalog.storage.transactions (1)
 +- Sort (8)
 +- Exchange (7)
 +- Filter (6)
 +- Scan parquet spark_catalog.storage.products (5)

Let’s collect the DataFrame and check out the Spark UI, we can see that:

  1. Runtime exchange statistics show that the product table is small (4.6 MiB)
  2. The join strategy is dynamically switched to a BroadcastHashJoin

AQE Spark UI join

⚠️ Note
While AQE in Spark brings dynamic optimizations, it may not always select broadcast joins automatically. Consider using broadcast join hints if you have a good understanding of your query’s characteristics and data distribution:

%%sql

SELECT /*+ BROADCAST(p) */
t.t_date AS sales_date
,p.p_product_id
,SUM(t.t_quantity * p.p_cost) AS total_sales
FROM transactions t

JOIN products p 
ON t.t_product_id = p.p_product_id
WHERE p.p_cost < 10
GROUP BY t.t_date, p.p_product_id;

3. Dynamic Skew Join Handling

Divide and Conquer

Data skew occurs when some partitions are significantly larger than others, causing performance bottlenecks: the bigger the partition, the longer it takes to process it.  

AQE employs a “divide and conquer” strategy to address data skewness during joins: it detects skewed partitions at runtime and splits them into smaller sub-partitions to balance the load, eliminating long-running tasks and reducing disk spilling.

To detect whether a partition is skewed, Spark uses two configuration parameters along with the median partition size. Specifically, a partition is considered skewed if:

  • The partition size is greater than skewedPartitionFactor times the median partition size.
  • The partition size exceeds skewedPartitionThresholdInBytes.  

🛠️ Example

For demonstration purposes, we can adjust some of the skew join parameters to intentionally cause Spark to recognize partitions as skewed. The following configuration settings will be applied:

spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 2)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100M")

Next, let’s run the following query:

# Calculate the total sales amount grouped by date & ordered by total sales
df = spark.sql("""
 SELECT
 t.t_date AS sales_date
 ,SUM(t.t_quantity * p.p_cost) AS total_sales
 FROM transactions t

 JOIN products p 
 ON t.t_product_id = p.p_product_id
 GROUP BY t.t_date
 ORDER BY total_sales DESC
""")
 
display(df)

By examining the Spark UI, we can observe the following:

  1. Runtime a skewed partition
  2. Spark splits the skewed partition into smaller partitions
  3. The sort SortMergeJoin is marked with a skew join flag

AQE Spark UI skew join

Performance Comparison: AQE Enabled vs. Disabled

Let’s demonstrate the time gains achieved with AQE. We’ll run a query twice: once with AQE enabled and once with AQE disabled, and then compare the execution times.

AQE Enabled

# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 2)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100M")

# Run the query and measure execution time
import time

start_time = time.time()

df = spark.sql("""
SELECT
t.t_date AS sales_date
,p.p_product_id
,SUM(t.t_quantity * p.p_cost) AS total_sales
FROM transactions t

JOIN products p 
ON t.t_product_id = p.p_product_id
WHERE p.p_cost < 10
GROUP BY t.t_date, p.p_product_id
ORDER BY total_sales DESC;
""")

display(df)

aqe_enabled_time = time.time() - start_time

AQE Disabled

# Disable AQE
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Run the query and measure execution time
start_time = time.time()

df = spark.sql("""
SELECT
t.t_date AS sales_date
,p.p_product_id
,SUM(t.t_quantity * p.p_cost) AS total_sales
FROM transactions t

JOIN products p 
ON t.t_product_id = p.p_product_id
WHERE p.p_cost < 10
GROUP BY t.t_date, p.p_product_id
ORDER BY total_sales DESC;
""")

display(df)

aqe_disabled_time = time.time() - start_time

Results

The following graph show the performance improvements brought by AQE:

AQE perf comparison

That’s a 54% improvement in query time without any modifications to the query itself!

Conclusion

Adaptive Query Execution represents a significant advancement in Spark SQL, providing a more intelligent and responsive approach to query optimization. By leveraging runtime statistics and dynamically adjusting execution plans, AQE addresses many of the challenges associated with static query planning, leading to faster and more reliable query execution.
As data continues to grow in complexity and scale, innovations like AQE are the reason why Spark stands as the number one data processing engine!

Sources