In this post, we will learn about AQE. This is a newer feature that updates your query plans based on runtime statistics.
What AQE provides is adaptive planning, it makes decisions based on statistics and it automatically happens whenever there is a stage boundary.
AQE Framework.
Dynamic query optimisation that happens in the middle of query execution based on runtime statistics.
The query will start until one or more stages are completed, then it will optimise the unexecuted part of the query and repeat this process if there are more stages to run.
If there are no more stages to run then the process is done.
AQE Features.
- Dynamically coalesce shuffle partitions.
- Dynamically switch join strategies.
- Dynamically optimise skew joins.
Shuffle Partitions.
- Challenge: No universal partition number exists because data changes at different times of query execution.
- Solution: Set the initial partition number high, but automatically coalesce partitions.
As you can see, AQE allows for more efficient use of the partitions and subsequently speed up our query runtime.
Standard Join.
- All the data is shuffled.
Let’s say you have two datasets, red and blue. The data is partitioned across our executors. If we want to join our two datasets together, in a standard join we join all our data locally and then shuffle the data across the cluster to get all of the relevant data together. (Red with red, blue with blue, etc.)
Broadcast Join.
Only the small data is moved, big data is left untouched.
- It duplicates the ‘small’ data across all executors.
If you have two datasets with one being significantly smaller than the other, instead of partitioning the data across your workers you can broadcast an entire copy of that data to every worker. This way, you can do the joins locally and shuffle less data which is computationally less expensive and with more performance.
Join strategy.
- Challenge: Estimates can be wrong and miss the opportunity for broadcast joins since it has a limit of data that can be broadcast.
- Solution: Replan joins with runtime data sizes.
Skew Joins.
- Challenge: Data skew can have a negative performance impact.
- Solution: Use runtime statistics to:
- Detect skew from partition sizes.
- Split skew partitions into smaller subpartitions.
We already know if one task is very fast, it will not cause an issue. But if one task is extremely slow, then we have a bottleneck. This is why data skew can have negative impacts on performance.
AQE Demo.
Take a look at the runtime for this query when AQE is disabled:
Now, take a look at the run time when we enable AQE:
Another thing that is significant is the number of partitions when AQE is enabled and disabled. In this example, when AQE is enabled there was only one partition in the second stage:
Comparing the above to AQE being disabled below:
We are also able to see a broadcast join at work using the Spark UI. In this example, we are joining two datasets with one having around 250,000 records and the other a little under 5,000,000 records. This is a perfect candidate for a broadcast join to prevent data skew and performance issues.
You can actually follow each dataset along its journey. Since we have AQE enabled you will also notice at the top we can see something called ‘AdaptiveSparkPlan’.
Note that this query took around 1 minute to execute.
However, since we did not provide any hints. The broadcast join was not executed. Let’s look at an example where we provide a hint:
We provide a hint as a comment, but Spark parses this even though it is commented out.
We can now see that a broadcast join has been executed and that the performance has drastically increased. We executed this query in around 10 seconds.
The takeaway from this should be that AQE is a powerful feature but we cannot always rely on it to optimise our queries correctly. You should always optimise the query yourself when it needs to be optimised.