Apache Spark 3.0 AQE
New Added Features in Spark 3.0
Spark 3.0.0 has released early June 2020
With the release of Spark 3.0, there are so many improvements implemented for faster execution.
Well, there are many several changes done in improving SQL Performance such as:
- Adaptive Query Execution (AQE)
- New EXPLAIN Format
- Dataframe tail function
- Join Hints
- Dynamic Partition Pruning
Source:- SPARK+AI SUMMIT EUROPE 2019, SPARK 3.0 OFFICIAL DOCS & Google Search
Today’s session I will be briefing first 3 features and rest of other I will continue in my next session
Used GCP based Bigdata Component Details:
Spark 3.0 based Environment Details:
- Hadoop 3.2
- Spark 3.0
- Python 3.7.4
Spark 2.0 based Environment Details:
- Hadoop 2.9
- Spark 2.3
- Python 2.7.14
Adaptive Query Execution(AQE)
Spark catalyst is one of the most important layer of spark SQL which does all the query optimization.
Even though spark catalyst does lot of heavy lifting, it’s all done before query execution. So that means once the physical plan is created and execution of the plan started, it will not do any optimization there after. So it cannot do some of the optimization which is based on metrics it sees when the execution is going on.
In 3.0, spark has introduced an additional layer of optimization. This layer is known as Adaptive Query Execution(AQE). This layer tries to optimise the queries depending upon the metrics that are collected as part of the execution.
Adaptive Query Execution, AQE, is a layer on top of the spark catalyst which will modify the spark plan on the fly. This allows spark to do some of the things which are not possible to do in catalyst today.
Adaptive Number of Shuffle Partitions or Reducers
In Spark SQL, number of shuffle partitions are set using spark.sql.shuffle.partitions which defaults to 200. In most of the cases, this number is too high for smaller data and too small for bigger data. Selecting right value becomes always tricky for the developer.
So we need an ability to coalesce the shuffle partitions by looking at the mapper output. If the mapping generates small number of partitions, we want to reduce the overall shuffle partitions so it will improve the performance
Shuffle Partitions without AQE:
Before we see how to optimise the shuffle partitions, let’s see what is the problem we are trying to solve. Let’s take below example
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName(“Spark Adaptive Query Execution “) \
.config(“spark.some.config.option”, “some-value”) \
.getOrCreate()
sc=spark.sparkContext
df=spark.read.format(“csv”).option(“header”, “true”).option(“inferSchema”, “true”).load(“gs://aparup-files/sales.csv”).repartition(500)
In above code, I am reading a small file and increasing the partitions to 500. This increase is to force the spark to use maximum shuffle partitions and file size: 226B
df.show(4, False)
#GroupBy for Shuffle
df.groupBy(“customerId”).count().count()
#sales_df=df.groupBy(“customerId”).count()
#sales_df.write.parquet(“gs://aparup-files/spark2.parquet”)
sc.stop()
Observing Job: Spark 2 doesn’t has AQE
When I am running in Spark2 Cluster its throwing error as AQE is by default set to false and we cant use this because to use AQE we need enable ‘spark.sql.adaptive.coalescePartitions.enabled’ to check the requires partition based on result metrics and its not present in spark 2.
Spark 2without AQE enabled
Spark 3 with AQE enabled
Spark 2 Observing Stages
As you can observe from the image, stage id 14, 200 tasks ran even the data was very less.
Spark 2 Observing Dags
From the image, you can observe that there was lot of shuffle.
Optimizing Shuffle Partitions in AQE
Enabling the configuration
To use AQE we need to set spark.sql.adaptive.enabled to true.
conf.set(“spark.sql.adaptive.enabled”, “true”)
To use the shuffle partitions optimization we need to set spark.sql.adaptive.coalescePartitions.enabled to true.
conf.set(“spark.sql.adaptive.coalescePartitions.enabled”, “true”)
Spark 3 Observing Stages
From the image you can observe that, most of the stages are skipped all together as spark figured out that most of the partitions are empty.
Spark 3 Observing Dags
From the image, you can observe most of the shuffle was skipped. There is a CoalescedShuffleReader which is combining all the shuffle partitions to 1.
So by just enabling few configuration we can dynamically optimize the shuffle partitions in AQE.
New EXPLAIN Format
In Spark EXPLAIN function returns the detail of spark sql query execution stages or you can say how query is optimized
Challenges in Spark 2 — Not easy to understand how a query is optimized i.e output is too complex
Key Feature of Explain function in Spark 3 –
EASY TO READ QUERY EXECUTION PLAN by adding Explain mode=”formatted“
query=”select customerId,max(amountPaid) from spark3.sample_tbl where customerId>0 group by customerId having max(amountPaid)>0 “
Explain in Spark 2
Not easy to understand how a query is optimized
Explain in Spark 3
Easy to Read Query Plan
Dataframe tail function
In many times in our code, we would like to read few rows from the dataframe.
For this, we use head function on top of the dataframe which Internally implemented by reading only needed number of items by accessing one partition at a time from beginning.
But to access the values from last partition of Dataframe till Spark V2 we don’t have any straight forward way
So in Spark V3 new function tail has been introduced for reading values from the last partition of a dataframe.
Useful Resources: