Apache Spark 3.0 AQE

Aparup Chatterjee
5 min readJul 1, 2021

--

New Added Features in Spark 3.0

Spark 3.0.0 has released early June 2020

With the release of Spark 3.0, there are so many improvements implemented for faster execution.

Well, there are many several changes done in improving SQL Performance such as:

  • Adaptive Query Execution (AQE)
  • New EXPLAIN Format
  • Dataframe tail function
  • Join Hints
  • Dynamic Partition Pruning

Source:- SPARK+AI SUMMIT EUROPE 2019, SPARK 3.0 OFFICIAL DOCS & Google Search

Today’s session I will be briefing first 3 features and rest of other I will continue in my next session

Used GCP based Bigdata Component Details:

Spark 3.0 based Environment Details:

  • Hadoop 3.2
  • Spark 3.0
  • Python 3.7.4

Spark 2.0 based Environment Details:

  • Hadoop 2.9
  • Spark 2.3
  • Python 2.7.14

Adaptive Query Execution(AQE)

Spark catalyst is one of the most important layer of spark SQL which does all the query optimization.
Even though spark catalyst does lot of heavy lifting, it’s all done before query execution. So that means once the physical plan is created and execution of the plan started, it will not do any optimization there after. So it cannot do some of the optimization which is based on metrics it sees when the execution is going on.

In 3.0, spark has introduced an additional layer of optimization. This layer is known as Adaptive Query Execution(AQE). This layer tries to optimise the queries depending upon the metrics that are collected as part of the execution.

Adaptive Query Execution, AQE, is a layer on top of the spark catalyst which will modify the spark plan on the fly. This allows spark to do some of the things which are not possible to do in catalyst today.

Adaptive Number of Shuffle Partitions or Reducers

In Spark SQL, number of shuffle partitions are set using spark.sql.shuffle.partitions which defaults to 200. In most of the cases, this number is too high for smaller data and too small for bigger data. Selecting right value becomes always tricky for the developer.

So we need an ability to coalesce the shuffle partitions by looking at the mapper output. If the mapping generates small number of partitions, we want to reduce the overall shuffle partitions so it will improve the performance
Shuffle Partitions without AQE:

Before we see how to optimise the shuffle partitions, let’s see what is the problem we are trying to solve. Let’s take below example

from pyspark.sql import SparkSession

spark = SparkSession \

.builder \

.appName(“Spark Adaptive Query Execution “) \

.config(“spark.some.config.option”, “some-value”) \

.getOrCreate()

sc=spark.sparkContext

df=spark.read.format(“csv”).option(“header”, “true”).option(“inferSchema”, “true”).load(“gs://aparup-files/sales.csv”).repartition(500)

In above code, I am reading a small file and increasing the partitions to 500. This increase is to force the spark to use maximum shuffle partitions and file size: 226B

df.show(4, False)

#GroupBy for Shuffle

df.groupBy(“customerId”).count().count()

#sales_df=df.groupBy(“customerId”).count()

#sales_df.write.parquet(“gs://aparup-files/spark2.parquet”)

sc.stop()

Observing Job: Spark 2 doesn’t has AQE
When I am running in Spark2 Cluster its throwing error as AQE is by default set to false and we cant use this because to use AQE we need enable ‘spark.sql.adaptive.coalescePartitions.enabled’ to check the requires partition based on result metrics and its not present in spark 2.

Spark 2without AQE enabled

Spark 3 with AQE enabled

Spark 2 Observing Stages

As you can observe from the image, stage id 14, 200 tasks ran even the data was very less.

Spark 2 Observing Dags

From the image, you can observe that there was lot of shuffle.

Optimizing Shuffle Partitions in AQE

Enabling the configuration

To use AQE we need to set spark.sql.adaptive.enabled to true.

conf.set(“spark.sql.adaptive.enabled”, “true”)

To use the shuffle partitions optimization we need to set spark.sql.adaptive.coalescePartitions.enabled to true.

conf.set(“spark.sql.adaptive.coalescePartitions.enabled”, “true”)

Spark 3 Observing Stages

From the image you can observe that, most of the stages are skipped all together as spark figured out that most of the partitions are empty.

Spark 3 Observing Dags

From the image, you can observe most of the shuffle was skipped. There is a CoalescedShuffleReader which is combining all the shuffle partitions to 1.

So by just enabling few configuration we can dynamically optimize the shuffle partitions in AQE.

New EXPLAIN Format

In Spark EXPLAIN function returns the detail of spark sql query execution stages or you can say how query is optimized

Challenges in Spark 2 — Not easy to understand how a query is optimized i.e output is too complex

Key Feature of Explain function in Spark 3 –

EASY TO READ QUERY EXECUTION PLAN by adding Explain mode=”formatted“

query=”select customerId,max(amountPaid) from spark3.sample_tbl where customerId>0 group by customerId having max(amountPaid)>0 “

Explain in Spark 2

Not easy to understand how a query is optimized

output is too complex!!!

Explain in Spark 3

Easy to Read Query Plan

Output with Very Detailed Information

Dataframe tail function

In many times in our code, we would like to read few rows from the dataframe.

For this, we use head function on top of the dataframe which Internally implemented by reading only needed number of items by accessing one partition at a time from beginning.

But to access the values from last partition of Dataframe till Spark V2 we don’t have any straight forward way
So in Spark V3 new function tail has been introduced for reading values from the last partition of a dataframe.

Spark 2 Don’t have tail Function
Spark 3 introduced new tail Function

Useful Resources:

--

--

Aparup Chatterjee
Aparup Chatterjee

No responses yet