Wednesday, February 3, 2021

Spark Tuning -- Predicate Pushdown for Parquet

Goal:

This article explains the Predicate Pushdown for Parquet in Spark.

Solution:

Spark can push down the predicate into scanning parquet phase so that it can reduce the amount of data to be read. 

This is done by checking the metadata of parquet files to filter out the unnecessary data.

Note: Refer to this blog on How to use pyarrow to view the metadata information inside a Parquet file.

This feature is controlled by a parameter named spark.sql.parquet.filterPushdown (default is true).

Let's use the parquet files created in another blog for example.

1. Create a DataFrame on parquet files

val targetdir = "/tmp/test_column_projection/newdf"
val readdf = spark.read.format("parquet").load(targetdir)
readdf.createOrReplaceTempView("readdf")

2. Let's look at the data distribution for column "Index".

scala> spark.sql("SELECT min(Index), max(Index), count(distinct Index),count(*) FROM readdf").show
+----------+----------+---------------------+--------+
|min(Index)|max(Index)|count(DISTINCT Index)|count(1)|
+----------+----------+---------------------+--------+
| 0| 396342| 396343| 6240991|
+----------+----------+---------------------+--------+

As we know, the data range of this column "Index" is 0~396342.

After knowing this, we can design our tests below to show the difference performance results for different filters.

3. Query 1 and its explain plan

val q1  = "SELECT * FROM readdf WHERE Index=20000"
val result1 = spark.sql(q1)
result1.explain
result1.collect

Query 1 will have to scan lots of data because the "Index=20000" data is in most of the parquet chunks.

The explain plan:

== Physical Plan ==
*(1) Project [Arrival_Time#26L, Creation_Time#27L, Device#28, Index#29L, Model#30, User#31, gt#32, x#33, y#34, z#35]
+- *(1) Filter (isnotnull(Index#29L) && (Index#29L = 20000))
+- *(1) FileScan parquet [Arrival_Time#26L,Creation_Time#27L,Device#28,Index#29L,Model#30,User#31,gt#32,x#33,y#34,z#35] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Index), EqualTo(Index,20000)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...

4. Query 2 and its explain plan

val q2  = "SELECT * FROM readdf where Index=9999999999"
val result2 = spark.sql(q2)
result2.explain
result2.collect

Query 2 just needs to scan little data because the "Index=9999999999" data is outside the range for that column.

The explain plan:

== Physical Plan ==
*(1) Project [Arrival_Time#26L, Creation_Time#27L, Device#28, Index#29L, Model#30, User#31, gt#32, x#33, y#34, z#35]
+- *(1) Filter (isnotnull(Index#29L) && (Index#29L = 9999999999))
+- *(1) FileScan parquet [Arrival_Time#26L,Creation_Time#27L,Device#28,Index#29L,Model#30,User#31,gt#32,x#33,y#34,z#35] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Index), EqualTo(Index,9999999999)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...

5. Query 3 and its explain plan after disabling spark.sql.parquet.filterPushdown

Everything is the same as Query 2, and the only difference is we manually disabled this feature by setting below in config:
config("spark.sql.parquet.filterPushdown",false)
Because we disabled this feature, so it has to scan all the parquet data.
The explain plan:
== Physical Plan ==
*(1) Project [Arrival_Time#26L, Creation_Time#27L, Device#28, Index#29L, Model#30, User#31, gt#32, x#33, y#34, z#35]
+- *(1) Filter (isnotnull(Index#29L) && (Index#29L = 9999999999))
+- *(1) FileScan parquet [Arrival_Time#26L,Creation_Time#27L,Device#28,Index#29L,Model#30,User#31,gt#32,x#33,y#34,z#35] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Index), EqualTo(Index,9999999999)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...

Analysis:

1. Explain plan

As we can see, all of the explain plans look the same. 

Even after we disabled spark.sql.parquet.filterPushdown, the explain plan did not show any difference between Query 2 and Query 3.

This means, at least from query plan, we could not tell if predicate is pushed down or not.

All of the explain plans show there is predicate push down:

PushedFilters: [IsNotNull(Index), EqualTo(Index,9999999999)]

Note: these tests are done in Spark 2.4.4, this behavior may change in the future release.

2. Event log/Web UI

Query 1's stage shows sum of Input Size is 142.3MB and sum of Records is 6240991:

 



 

 

 

Query 2's stage shows sum of Input Size is 44.4KB and sum of Records is 0:

 


 

 

 

 

 

Query 3's stage shows sum of Input Size is 142.3MB and sum of Records is 6240991:

 


 

 

 

 

Above metrics clearly shows the selectivity for this predicate pushdown feature based on the filter and also on the metadata of parquet files.

The performance difference between Query 2 and Query 3 shows how powerful this feature is.

Note:  If the metadata of all parquet files has most/all of the data based on the filter, then this feature may not provide good selectivity. So data distribution also matters here.

Note: Here is the Complete Sample Code


No comments:

Post a Comment

Popular Posts