Monday, March 15, 2021

Spark Tuning -- Dynamic Partition Pruning

Goal:

This article explains Dynamic Partition Pruning (DPP) feature introduced in Spark 3.0.

Env:

Spark 3.0.2

Concept:

Dynamic Partition Pruning feature is introduced by SPARK-11150 .

This JIRA also provides a minimal query and its design for example:

Here let's assume: "t1" is a very large fact table with partition key column "pKey", and "t2" is a small dimension table. 

Since there is a filter on "t2" -- "t2.id < 2", internally DPP can create a subquery:

SELECT t2.pKey FROM t2 WHERE t2.id;

and then broadcast this sub-query result, so that we can use this result to prune partitions for "t1".  

In the meantime, the sub-query result is re-used. See below graph from this slides from databricks:

As a result, we can save lots of table scan on the fact table side which brings huge performance gain.

The parameter to enable or disable DPP is:

  • spark.sql.optimizer.dynamicPartitionPruning.enabled  (true by default)

Spark is not the only product using DPP and some other query engines also have this feature such as impala, Hive on Tez, etc.

Solution:

1. CPU mode

Here is a simple example query(run in spark-shell) which can help us check if DPP is used or not:

spark.range(1000).select(col("id"), col("id").as("k")).write.partitionBy("k").format("parquet").mode("overwrite").save("/tmp/myfact")
spark.range(100).select(col("id"), col("id").as("k")).write.format("parquet").mode("overwrite").save("/tmp/mydim")
spark.read.parquet("/tmp/myfact").createOrReplaceTempView("fact")
spark.read.parquet("/tmp/mydim").createOrReplaceTempView("dim")
sql("SELECT fact.id, fact.k FROM fact JOIN dim ON fact.k = dim.k AND dim.id < 2").explain

The physical plan is:

scala> sql("SELECT fact.id, fact.k FROM fact JOIN dim ON fact.k = dim.k AND dim.id < 2").explain
== Physical Plan ==
*(2) Project [id#14L, k#15]
+- *(2) BroadcastHashJoin [cast(k#15 as bigint)], [k#19L], Inner, BuildRight
:- *(2) ColumnarToRow
: +- FileScan parquet [id#14L,k#15] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/myfact], PartitionFilters: [isnotnull(k#15), dynamicpruningexpression(cast(k#15 as bigint) IN dynamicpruning#24)], PushedFilters: [], ReadSchema: struct<id:bigint>
: +- SubqueryBroadcast dynamicpruning#24, 0, [k#19L], [id=#118]
: +- ReusedExchange [k#19L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#96]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#96]
+- *(1) Project [k#19L]
+- *(1) Filter ((isnotnull(id#18L) AND (id#18L < 2)) AND isnotnull(k#19L))
+- *(1) ColumnarToRow
+- FileScan parquet [id#18L,k#19L] Batched: true, DataFilters: [isnotnull(id#18L), (id#18L < 2), isnotnull(k#19L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/mydim], PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>

 Let's compare it to a plan with DPP disabled:

scala> sql("set spark.sql.optimizer.dynamicPartitionPruning.enabled=false")
res14: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> sql("SELECT fact.id, fact.k FROM fact JOIN dim ON fact.k = dim.k AND dim.id < 2").explain
== Physical Plan ==
*(2) Project [id#35L, k#36]
+- *(2) BroadcastHashJoin [cast(k#36 as bigint)], [k#40L], Inner, BuildRight
:- *(2) ColumnarToRow
: +- FileScan parquet [id#35L,k#36] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/myfact], PartitionFilters: [isnotnull(k#36)], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#288]
+- *(1) Project [k#40L]
+- *(1) Filter ((isnotnull(id#39L) AND (id#39L < 2)) AND isnotnull(k#40L))
+- *(1) ColumnarToRow
+- FileScan parquet [id#39L,k#40L] Batched: true, DataFilters: [isnotnull(id#39L), (id#39L < 2), isnotnull(k#40L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/mydim], PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>

 As you can see, when DPP is enabled, we can see the keyword "ReusedExchange" and "SubqueryBroadcast" before scanning the fact table. 

In the fact table scan phase,  there is keyword "dynamicpruningexpression".

 If we let the query run with DPP enabled, then we can check the runtime query plan from UI:

 


Here you should notice that the "dynamic partition pruning time: 41 ms" and also the "number of partitions read: 2" which means DPP is taking effect.

Now let's take a look at a more complex example which is q98 in TPCDS:

select i_item_desc, i_category, i_class, i_current_price
,sum(ss_ext_sales_price) as itemrevenue
,sum(ss_ext_sales_price)*100/sum(sum(ss_ext_sales_price)) over
(partition by i_class) as revenueratio
from
store_sales, item, date_dim
where
ss_item_sk = i_item_sk
and i_category in ('Sports', 'Books', 'Home')
and ss_sold_date_sk = d_date_sk
and d_date between cast('1999-02-22' as date)
and (cast('1999-02-22' as date) + interval '30' day)
group by
i_item_id, i_item_desc, i_category, i_class, i_current_price
order by
i_category, i_class, i_item_id, i_item_desc, revenueratio;

 We just need to focus on fact table "store_sales" joining dimension table "date_dim" based on join key "ss_sold_date_sk = d_date_sk". 

The column "ss_sold_date_sk" is also the partition key for "store_sales".

"date_dim" has a filter on column "d_date" to only fetch 30 days' data.

Now the query plan is:

== Physical Plan ==
*(7) Project [i_item_desc#97, i_category#105, i_class#103, i_current_price#98, itemrevenue#159, revenueratio#160]
+- *(7) Sort [i_category#105 ASC NULLS FIRST, i_class#103 ASC NULLS FIRST, i_item_id#94 ASC NULLS FIRST, i_item_desc#97 ASC NULLS FIRST, revenueratio#160 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(i_category#105 ASC NULLS FIRST, i_class#103 ASC NULLS FIRST, i_item_id#94 ASC NULLS FIRST, i_item_desc#97 ASC NULLS FIRST, revenueratio#160 ASC NULLS FIRST, 20), true, [id=#490]
+- *(6) Project [i_item_desc#97, i_category#105, i_class#103, i_current_price#98, itemrevenue#159, ((_w0#170 * 100.0) / _we0#172) AS revenueratio#160, i_item_id#94]
+- Window [sum(_w1#171) windowspecdefinition(i_class#103, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#172], [i_class#103]
+- *(5) Sort [i_class#103 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_class#103, 20), true, [id=#482]
+- *(4) HashAggregate(keys=[i_item_id#94, i_item_desc#97, i_category#105, i_class#103, i_current_price#98], functions=[sum(ss_ext_sales_price#84)], output=[i_item_desc#97, i_category#105, i_class#103, i_current_price#98, itemrevenue#159, _w0#170, _w1#171, i_item_id#94])
+- Exchange hashpartitioning(i_item_id#94, i_item_desc#97, i_category#105, i_class#103, i_current_price#98, 20), true, [id=#478]
+- *(3) HashAggregate(keys=[i_item_id#94, i_item_desc#97, i_category#105, i_class#103, knownfloatingpointnormalized(normalizenanandzero(i_current_price#98)) AS i_current_price#98], functions=[partial_sum(ss_ext_sales_price#84)], output=[i_item_id#94, i_item_desc#97, i_category#105, i_class#103, i_current_price#98, sum#175])
+- *(3) Project [ss_ext_sales_price#84, i_item_id#94, i_item_desc#97, i_current_price#98, i_class#103, i_category#105]
+- *(3) BroadcastHashJoin [ss_sold_date_sk#92], [d_date_sk#115], Inner, BuildRight
:- *(3) Project [ss_ext_sales_price#84, ss_sold_date_sk#92, i_item_id#94, i_item_desc#97, i_current_price#98, i_class#103, i_category#105]
: +- *(3) BroadcastHashJoin [ss_item_sk#71], [i_item_sk#93], Inner, BuildRight
: :- *(3) Project [ss_item_sk#71, ss_ext_sales_price#84, ss_sold_date_sk#92]
: : +- *(3) Filter isnotnull(ss_item_sk#71)
: : +- *(3) ColumnarToRow
: : +- FileScan parquet tpcds.store_sales[ss_item_sk#71,ss_ext_sales_price#84,ss_sold_date_sk#92] Batched: true, DataFilters: [isnotnull(ss_item_sk#71)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/data/tpcds_100G_parquet/store_sales/ss_sold_date_sk=24..., PartitionFilters: [isnotnull(ss_sold_date_sk#92), dynamicpruningexpression(ss_sold_date_sk#92 IN dynamicpruning#173)], PushedFilters: [IsNotNull(ss_item_sk)], ReadSchema: struct<ss_item_sk:int,ss_ext_sales_price:double>
: : +- SubqueryBroadcast dynamicpruning#173, 0, [d_date_sk#115], [id=#466]
: : +- ReusedExchange [d_date_sk#115], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#426]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#417]
: +- *(1) Project [i_item_sk#93, i_item_id#94, i_item_desc#97, i_current_price#98, i_class#103, i_category#105]
: +- *(1) Filter (i_category#105 IN (Sports,Books,Home) AND isnotnull(i_item_sk#93))
: +- *(1) ColumnarToRow
: +- FileScan parquet tpcds.item[i_item_sk#93,i_item_id#94,i_item_desc#97,i_current_price#98,i_class#103,i_category#105] Batched: true, DataFilters: [i_category#105 IN (Sports,Books,Home), isnotnull(i_item_sk#93)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/data/tpcds_100G_parquet/item], PartitionFilters: [], PushedFilters: [In(i_category, [Sports,Books,Home]), IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:int,i_item_id:string,i_item_desc:string,i_current_price:double,i_class:string,i_...
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#426]
+- *(2) Project [d_date_sk#115]
+- *(2) Filter (((isnotnull(d_date#117) AND (d_date#117 >= 10644)) AND (d_date#117 <= 10674)) AND isnotnull(d_date_sk#115))
+- *(2) ColumnarToRow
+- FileScan parquet tpcds.date_dim[d_date_sk#115,d_date#117] Batched: true, DataFilters: [isnotnull(d_date#117), (d_date#117 >= 10644), (d_date#117 <= 10674), isnotnull(d_date_sk#115)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/data/tpcds_100G_parquet/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date), GreaterThanOrEqual(d_date,1999-02-22), LessThanOrEqual(d_date,1999-03-24), Is..., ReadSchema: struct<d_date_sk:int,d_date:date>

 Key point is:

:     :        +- FileScan parquet tpcds.store_sales[ss_item_sk#71,ss_ext_sales_price#84,ss_sold_date_sk#92] Batched: true, DataFilters: [isnotnull(sLocation: InMemoryFileIndex[hdfs://nm:port/data/tpcds_100G_parquet/store_sales/ss_sold_date_sk=24..., PartitionFilters: [isnotnull(ss_sold_date_ss_sold_date_sk#92 IN dynamicpruning#173)], PushedFilters: [IsNotNull(ss_item_sk)], ReadSchema: struct<ss_item_sk:int,ss_ext_sales_price:double>
: : +- SubqueryBroadcast dynamicpruning#173, 0, [d_date_sk#115], [id=#466]
: : +- ReusedExchange [d_date_sk#115], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#426]

The fact table scan have this DPP enabled in "PartitionFilters: [isnotnull(ss_sold_date_ss_sold_date_sk#92 IN dynamicpruning#173)]".

"dynamicpruning#173" basically comes from the broadcasted sub-query.

2. GPU mode

Now let's try the same minimum query using Rapids for Spark Accelerator(current release 0.3) + Spark to see what is the query plan under GPU:

scala> sql("SELECT fact.id, fact.k FROM fact JOIN dim ON fact.k = dim.k AND dim.id < 2").explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [id#0L, k#1]
+- GpuBroadcastHashJoin [cast(k#1 as bigint)], [k#5L], Inner, GpuBuildRight
:- GpuFileGpuScan parquet [id#0L,k#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/myfact], PartitionFilters: [isnotnull(k#1), dynamicpruningexpression(cast(k#1 as bigint) IN dynamicpruning#10)], PushedFilters: [], ReadSchema: struct<id:bigint>
: +- SubqueryBroadcast dynamicpruning#10, 0, [k#5L], [id=#51]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#50]
: +- GpuColumnarToRow false
: +- GpuProject [k#5L]
: +- GpuCoalesceBatches TargetSize(2147483647)
: +- GpuFilter ((gpuisnotnull(id#4L) AND (id#4L < 2)) AND gpuisnotnull(k#5L))
: +- GpuFileGpuScan parquet [id#4L,k#5L] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L < 2), isnotnull(k#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/mydim], PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
+- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#70]
+- GpuProject [k#5L]
+- GpuCoalesceBatches TargetSize(2147483647)
+- GpuFilter ((gpuisnotnull(id#4L) AND (id#4L < 2)) AND gpuisnotnull(k#5L))
+- GpuFileGpuScan parquet [id#4L,k#5L] Batched: true, DataFilters: [isnotnull(id#4L), (id#4L < 2), isnotnull(k#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nm:port/tmp/mydim], PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>

As you can see, the DPP is also happening because when scanning fact table:

PartitionFilters: [isnotnull(k#1), dynamicpruningexpression(cast(k#1 as bigint) IN dynamicpruning#10)]

However here we see the sub-query on the dimension table is executed twice. 

This performance overhead should be very minimum since normally the "broadcast side" sub-query should be very lightweight. 

The on-going improvement for DPP will be tracked under this issue.

 This is why it is also mentioned in current version of FAQ:

 "Is Dynamic Partition Pruning (DPP) Supported?
Yes, DPP still works. It might not be as efficient as it could be, and we are working to improve it."

Key Takeaways:

DPP is a good feature for star-schema queries.

It uses partition runing and broadcast hash join together. 

It currently only supports equi-join.

The table to prune(fact table) should be partitioned by the join key.

References:

 

No comments:

Post a Comment

Popular Posts