Tuesday, March 16, 2021

Spark Tuning -- Adaptive Query Execution(1): Dynamically coalescing shuffle partitions

Goal:

This article explains Adaptive Query Execution (AQE)'s "Dynamically coalescing shuffle partitions" feature introduced in Spark 3.0.

Env:

 Spark 3.0.2

Concept:

Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default.Spark SQL can use the umbrella configuration of spark.sql.adaptive.enabled to control whether turn it on/off. 

In AQE on Spark 3.0, there are 3 features as below:

This article focuses on 1st feature "Dynamically coalescing shuffle partitions".

This feature coalesces the post shuffle partitions based on the map output statistics when both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled configurations are true. 

In below test, we will change spark.sql.adaptive.coalescePartitions.minPartitionNum to 1 which controls the minimum number of shuffle partitions after coalescing. If we do not decrease it, its default value is the same as spark.sql.shuffle.partitions(which is 200 by default).

Another important setting is spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64MB) which controls the advisory size in bytes of the shuffle partition during adaptive optimization.

Please refer to Spark Performance Tuning guide for details on all other related parameters.

Solution:

As per databricks blog "Adaptive Query Execution: Speeding Up Spark SQL at Runtime", it has a pretty good demo notebook which I will use for the following tests.

I will run below simple group-by query based on the tables created based in above demo instructions in different modes:

use aqe_demo_db;

SELECT s_date, sum(s_quantity) AS q
FROM sales
GROUP BY s_date
ORDER BY q DESC;

1. Default settings without AQE

Explain plan:

*(3) Sort [q#10L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(q#10L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#79]
+- *(2) HashAggregate(keys=[s_date#3], functions=[sum(cast(s_quantity#2 as bigint))], output=[s_date#3, q#10L])
+- Exchange hashpartitioning(s_date#3, 200), ENSURE_REQUIREMENTS, [id=#75]
+- *(1) HashAggregate(keys=[s_date#3], functions=[partial_sum(cast(s_quantity#2 as bigint))], output=[s_date#3, sum#19L])
+- *(1) ColumnarToRow
+- FileScan parquet aqe_demo_db.sales[s_quantity#2,s_date#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/sales], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s_quantity:int,s_date:date>
Let's focus on the 1st pair of HashAggregate and Exchange in which we can examine the shuffle read and shuffle write size for each task.

As per UI:

The shuffle writes per task is around 13KB which is too small for each task to process after that.

Let's look at stage level metrics for stage 0 and stage 1 as per above UI.

Stage 0's Shuffle Write Size: Avg 12.9KB , 30 tasks


 Stage 1's Shuffle Read Size: Avg 2.3KB, 200 tasks

Here is the final plan from UI(for comparison later):
== Physical Plan ==
* Sort (7)
+- Exchange (6)
   +- * HashAggregate (5)
      +- Exchange (4)
         +- * HashAggregate (3)
            +- * ColumnarToRow (2)
               +- Scan parquet aqe_demo_db.sales (1)

2. Default settings with AQE on

set spark.sql.adaptive.enabled = true;
set spark.sql.adaptive.coalescePartitions.minPartitionNum = 1;

Explain plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [q#34L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(q#34L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#119]
+- HashAggregate(keys=[s_date#23], functions=[sum(cast(s_quantity#22 as bigint))], output=[s_date#23, q#34L])
+- Exchange hashpartitioning(s_date#23, 200), ENSURE_REQUIREMENTS, [id=#116]
+- HashAggregate(keys=[s_date#23], functions=[partial_sum(cast(s_quantity#22 as bigint))], output=[s_date#23, sum#43L])
+- FileScan parquet aqe_demo_db.sales[s_quantity#22,s_date#23] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/sales], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s_quantity:int,s_date:date>

Notice here is the keyword "AdaptiveSparkPlan" but as it mentions this is not final plan.

Let's focus on the 1st pair of HashAggregate and Exchange in which we can examine the shuffle read and shuffle write size for each task.  

As per UI:

Now there is an extra "CustomShuffleReader" operator which coalesces the partitions to only 1 because the total partition data size is only 400KB.

Let's look at stage level metrics for stage 0 and stage 2 as per above UI.

Stage 0's Shuffle Write Size: Avg 12.9KB , 30 tasks(no change)

 Stage 2's Shuffle Read Size: 386.6KB, 1 task
 

So basically AQE combines all of the 200 partitions into 1.

Here is the final plan from UI which shows as below which you can find "CustomShuffleReader" keywords.

== Physical Plan ==
AdaptiveSparkPlan (12)
+- == Final Plan ==
* Sort (11)
+- CustomShuffleReader (10)
+- ShuffleQueryStage (9)
+- Exchange (8)
+- * HashAggregate (7)
+- CustomShuffleReader (6)
+- ShuffleQueryStage (5)
+- Exchange (4)
+- * HashAggregate (3)
+- * ColumnarToRow (2)
+- Scan parquet aqe_demo_db.sales (1)

3. Modified settings with AQE on

set spark.sql.adaptive.enabled = true;
set spark.sql.adaptive.coalescePartitions.minPartitionNum = 1;
set spark.sql.adaptive.advisoryPartitionSizeInBytes = 65536;

Here we just changed spark.sql.adaptive.advisoryPartitionSizeInBytes from default 64MB to 64KB, so that we can tune the target # of partitions.

The explain plan is the same as #2. 

The only difference is the # of partitions becomes 7 in Stage 2 now:

4. GPU Mode with AQE on(default settings)

Now let's try the same minimum query using Rapids for Spark Accelerator(current release 0.3) + Spark to see what is the query plan under GPU:

The explain plan may look as normal CPU plan because AQE is on, but actually if you run it, it will show you the correct final plan.

Explain plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [q#20L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(q#20L DESC NULLS LAST, 2), ENSURE_REQUIREMENTS, [id=#39]
+- HashAggregate(keys=[s_date#28], functions=[sum(cast(s_quantity#27 as bigint))], output=[s_date#28, q#20L])
+- Exchange hashpartitioning(s_date#28, 2), ENSURE_REQUIREMENTS, [id=#36]
+- HashAggregate(keys=[s_date#28], functions=[partial_sum(cast(s_quantity#27 as bigint))], output=[s_date#28, sum#32L])
+- FileScan parquet aqe_demo_db.sales[s_quantity#27,s_date#28] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/sales], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s_quantity:int,s_date:date>

Final Plan from UI:

== Physical Plan ==
AdaptiveSparkPlan (15)
+- == Final Plan ==
   GpuColumnarToRow (14)
   +- GpuSort (13)
      +- GpuCoalesceBatches (12)
         +- GpuShuffleCoalesce (11)
            +- GpuCustomShuffleReader (10)
               +- ShuffleQueryStage (9)
                  +- GpuColumnarExchange (8)
                     +- GpuHashAggregate (7)
                        +- GpuShuffleCoalesce (6)
                           +- GpuCustomShuffleReader (5)
                              +- ShuffleQueryStage (4)
                                 +- GpuColumnarExchange (3)
                                    +- GpuHashAggregate (2)
                                       +- GpuScan parquet aqe_demo_db.sales (1)

Stage 0's Shuffle Write Size: Avg 3.2KB , 30 tasks(huge decrease due to columnar storage processing)

 

 Stage 2's Shuffle Read Size: 97.5KB, 1 task

Basically GPU mode can produce much less shuffle files which result in much less shuffle writes and reads.

References:

 

No comments:

Post a Comment