Apache Spark 3.0 DPP

Aparup Chatterjee
6 min readJul 1, 2021

--

Introduction of Apache Spark 3.X Dynamic Partition Pruning (DPP)

Another big improvement of Spark 3.X is Dynamic Partition Pruning(DPP).

Before going to in detail about DPP, I would like to explain main 2 key points of DPP

1.Filter Pushdown

2.Partition Pruning

Used GCP based Bigdata Component Details

Spark 3.0 based Environment Details:

  • Hadoop 3.2
  • Spark 3.1
  • Python 3.8

Spark 2.0 based Environment Details:

  • Hadoop 2.9
  • Spark 2.3
  • Python 2.7.14

Source:- SPARK+AI SUMMIT NORTH AMERICA 2020, SPARK 3.X OFFICIAL DOCS & DATABRICKS

Filter Pushdown/ Predicate Pushdown

Filter Pushdown or Predicate Pushdown is an Optimization in Spark Sql.

To improve spark sql query performance, one strategy is to reduce the amount of data read (I/O) which is transferred from the data storage to executors.

Generally, when we use Filter/Where condition in our Spark Sql Query, Spark Catalyst Optimizer always attempts to “push down” filtering operations to the data source layer to save I/O cost (pic. Filter Push-down) instead of reading full files and send across executors (pic. Basic data-flow) then filtering.

Example:

Filter Pushdown Limitation

Filter Pushdown works based on Data Schema type, if filter condition requires casting the content of a field then cast functions cannot be pushed down and will read full file.

Example: if column age data type is String while reading from CSV/hive table

So as a Data Engineer we always need to cast column type if required while passing through filter condition so that spark can use Filter PushDown for optimization.

You can achieve this via casting on the fly or add custom schema to dataframe.

Partition Pruning

Partition pruning is another optimization but it works based on PredicatePushDown/FilterPushDown methodology.

Filtering data on Partitioned Column, the catalyst optimizer pushes down the partition filters at Data Source level. The scan reads only the directories(not actual content of data) that match the partition filters, thus reducing disk I/O.

example: suppose if a partition column contain records that match filter condition then there is no need to of PredicatePushDown instead of that spark will PushDown only filter column to read that partition and rest partitioned will be pruned/skipped, thereby avoiding unnecessary I/O.

Lets take same dataset where table partitioned by age:

Lets have a look into Physical Plan:

As you can see spark will Pruned/skip those irrelevant partitions and will take required partition which will further pushdown at data source level. Data pruning/skipping allows for a big performance boost.

This is also called Static Partition Pruning

Quick Recap of Filter PushDown and Partition Pruning

Filter Pushdown : When you filter on some column that isn’t in your partition, Spark will scan every part file in folder of that parquet table. Only when you have pushdown filtering, Spark will use the footer of every part file (where min, max and count statistics are stored) to determine if your search value is within that range. If yes, Spark will read the file fully. If not, Spark will skip the whole file, not costing you at least the full read.

When we filter off of df, the pushed filters are-

PushedFilters: [IsNotNull(age), GreaterThan(age,40)]

Partition Pruning: When you use filters on the columns which you did partition on, Spark will skip those files completely and it wouldn’t cost you any IO.

When we filter off of partitioned Df, the pushed filters are-

PartitionFilters: [isnotnull(age#102), (age#102 > 40)], PushedFilters: []

Spark doesn’t need to push the age filter when working off of partitioned DF because it can use a partition filter that is a lot faster.

Dynamic Partition Pruning

  • Dynamic partition pruning improves job performance by more accurately selecting the specific partitions within a table that need to be read and processed for a specific query.
  • Dynamic partition pruning allows the Spark engine to dynamically infer at runtime based on calculation column statistics of the data in selected columns i.e which partitions need to be read and which can be safely eliminated.
  • By reducing the amount of data read and processed, significant time is saved in job execution.

In Real life its very useful for Data Engineers with Star Schema Data Warehousing concept where we generally join big fact table with comparatively small dimension table.

Let’s look into below example:

Tables: withPartition_fact (Fact table) & withoutPartition_dim (Dimension table)

withPartition_fact -Partitioned by age
withoutPartition_dim -Non Partitioned

Physical Plan

Now its’s time to Deep Dive

Dimension Table Phase

Spark use Filter Pushdown/Predicate Pushdown method which will skip reading irrelevant data from dimension table before actual data fetching phase.

Fact Table Phase

Here you can notice the Partition Filter applied, which is formed internally from dimension table filter and at last dynamic pruning expression is formed.

Internal Architecture and Dag Level

From the above views we can see spark will pick particular partition from fact table which is relevant to dimension table filter condition and Spark will pruned other partitions of fact table.

According to our Dataframe query , dimension table filter “city=Bangalore”, Now from fact table Datasets we can see only 2 records are matched with different age(i.e 2 partition).

Now from the DAG also we can see using Dynamic Partition Pruning Spark only fetched those 2 partitions( number of files read:2) only and rest all irrelevant partitions are pruned out.

Spark 3 config that’s responsible for DPP :- spark.sql.optimizer.dynamicPartitionPruning.enabled — true (bydefault)

Spark Internal Dynamic Partition Pruning Steps

  1. Spark builds a hash table and forms an inner subquery out of the dimension table, which is broadcasted across all the executors. (Subquery Broadcast)

2. Using Subquery Broadcast , we are able to execute the join without requiring a shuffle.

3. Then spark will start probing that hash table with rows that come from the fact table on each worker node in its scanning phase, so that it doesn’t carry any irrelevant data to the Join phase.

Spark Dynamic Partition Pruning Factors

  • In Spark 3 by default its enabled.
  • Fact/Big table must be partitioned by a Column key
  • Its best suitable for Star Schema Data Warehousing Model.

Useful Resources:

--

--

Aparup Chatterjee
Aparup Chatterjee

Responses (1)