Wednesday, March 17, 2021

Spark Tuning -- Adaptive Query Execution(2): Dynamically switching join strategies

Goal:

This article explains Adaptive Query Execution (AQE)'s "Dynamically switching join strategies" feature introduced in Spark 3.0. 

This is a follow up article for Spark Tuning -- Adaptive Query Execution(1): Dynamically coalescing shuffle partitions.

Env:

Spark 3.0.2

Concept:

This article focuses on 2nd feature "Dynamically switching join strategies" in AQE.

As Spark Performance Tuning guide described:

AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. 

This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if spark.sql.adaptive.localShuffleReader.enabled is true)

Solution:

As per databricks blog "Adaptive Query Execution: Speeding Up Spark SQL at Runtime", it has a pretty good demo notebook which I will use for the following tests.

1. AQE off (default)

EXPLAIN cost
SELECT s_date, sum(s_quantity * i_price) AS total_sales
FROM sales
JOIN items ON s_item_id = i_item_id
WHERE i_price < 10
GROUP BY s_date
ORDER BY total_sales DESC;

The explain plan:

== Optimized Logical Plan ==
Sort [total_sales#10L DESC NULLS LAST], true, Statistics(sizeInBytes=368.1 PiB)
+- Aggregate [s_date#18], [s_date#18, sum(cast((s_quantity#17 * i_price#20) as bigint)) AS total_sales#10L], Statistics(sizeInBytes=368.1 PiB)
+- Project [s_quantity#17, s_date#18, i_price#20], Statistics(sizeInBytes=368.1 PiB)
+- Join Inner, (cast(s_item_id#16 as bigint) = i_item_id#19L), Statistics(sizeInBytes=589.0 PiB)
:- Filter isnotnull(s_item_id#16), Statistics(sizeInBytes=3.7 GiB)
: +- Relation[s_item_id#16,s_quantity#17,s_date#18] parquet, Statistics(sizeInBytes=3.7 GiB)
+- Filter ((isnotnull(i_price#20) AND (i_price#20 < 10)) AND isnotnull(i_item_id#19L)), Statistics(sizeInBytes=157.6 MiB)
+- Relation[i_item_id#19L,i_price#20] parquet, Statistics(sizeInBytes=157.6 MiB)

== Physical Plan ==
*(7) Sort [total_sales#10L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(total_sales#10L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#109]
+- *(6) HashAggregate(keys=[s_date#18], functions=[sum(cast((s_quantity#17 * i_price#20) as bigint))], output=[s_date#18, total_sales#10L])
+- Exchange hashpartitioning(s_date#18, 200), ENSURE_REQUIREMENTS, [id=#105]
+- *(5) HashAggregate(keys=[s_date#18], functions=[partial_sum(cast((s_quantity#17 * i_price#20) as bigint))], output=[s_date#18, sum#24L])
+- *(5) Project [s_quantity#17, s_date#18, i_price#20]
+- *(5) SortMergeJoin [cast(s_item_id#16 as bigint)], [i_item_id#19L], Inner
:- *(2) Sort [cast(s_item_id#16 as bigint) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(s_item_id#16 as bigint), 200), ENSURE_REQUIREMENTS, [id=#87]
: +- *(1) Filter isnotnull(s_item_id#16)
: +- *(1) ColumnarToRow
: +- FileScan parquet aqe_demo_db.sales[s_item_id#16,s_quantity#17,s_date#18] Batched: true, DataFilters: [isnotnull(s_item_id#16)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/sales], PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema: struct<s_item_id:int,s_quantity:int,s_date:date>
+- *(4) Sort [i_item_id#19L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_id#19L, 200), ENSURE_REQUIREMENTS, [id=#96]
+- *(3) Filter ((isnotnull(i_price#20) AND (i_price#20 < 10)) AND isnotnull(i_item_id#19L))
+- *(3) ColumnarToRow
+- FileScan parquet aqe_demo_db.items[i_item_id#19L,i_price#20] Batched: true, DataFilters: [isnotnull(i_price#20), (i_price#20 < 10), isnotnull(i_item_id#19L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/items], PartitionFilters: [], PushedFilters: [IsNotNull(i_price), LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema: struct<i_item_id:bigint,i_price:int>

 From the "Optimized Logical Plan", the estimated size of smaller side table "items" after filter "i_price<10" is 157.6MB which is larger than the default spark.sql.autoBroadcastJoinThreshold (10MB). As a result, a Sort Merge Join(SMJ) is chosen.

When we check the Spark UI after the query finishes, we found out that the actual size of the "smaller" join side is only 6.9MB which means the estimation is not very accurate:

As we know, normally the best performant join type is Broadcast Hash Join(BHJ) if one side is small enough to be broadcasted. 

In this case, how can we let Spark be smart enough to change the plan to BHJ from SMJ at runtime? AQE is here to help us.

2.  AQE on

set spark.sql.adaptive.enabled=true;

After AQE is turned on, the query plan would not change a lot except a sign "AdaptiveSparkPlan":

== Optimized Logical Plan ==
Sort [total_sales#35L DESC NULLS LAST], true, Statistics(sizeInBytes=368.1 PiB)
+- Aggregate [s_date#18], [s_date#18, sum(cast((s_quantity#17 * i_price#20) as bigint)) AS total_sales#35L], Statistics(sizeInBytes=368.1 PiB)
+- Project [s_quantity#17, s_date#18, i_price#20], Statistics(sizeInBytes=368.1 PiB)
+- Join Inner, (cast(s_item_id#16 as bigint) = i_item_id#19L), Statistics(sizeInBytes=589.0 PiB)
:- Filter isnotnull(s_item_id#16), Statistics(sizeInBytes=3.7 GiB)
: +- Relation[s_item_id#16,s_quantity#17,s_date#18] parquet, Statistics(sizeInBytes=3.7 GiB)
+- Filter ((isnotnull(i_price#20) AND (i_price#20 < 10)) AND isnotnull(i_item_id#19L)), Statistics(sizeInBytes=157.6 MiB)
+- Relation[i_item_id#19L,i_price#20] parquet, Statistics(sizeInBytes=157.6 MiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [total_sales#35L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(total_sales#35L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#177]
+- HashAggregate(keys=[s_date#18], functions=[sum(cast((s_quantity#17 * i_price#20) as bigint))], output=[s_date#18, total_sales#35L])
+- Exchange hashpartitioning(s_date#18, 200), ENSURE_REQUIREMENTS, [id=#174]
+- HashAggregate(keys=[s_date#18], functions=[partial_sum(cast((s_quantity#17 * i_price#20) as bigint))], output=[s_date#18, sum#44L])
+- Project [s_quantity#17, s_date#18, i_price#20]
+- SortMergeJoin [cast(s_item_id#16 as bigint)], [i_item_id#19L], Inner
:- Sort [cast(s_item_id#16 as bigint) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(s_item_id#16 as bigint), 200), ENSURE_REQUIREMENTS, [id=#166]
: +- Filter isnotnull(s_item_id#16)
: +- FileScan parquet aqe_demo_db.sales[s_item_id#16,s_quantity#17,s_date#18] Batched: true, DataFilters: [isnotnull(s_item_id#16)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/sales], PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema: struct<s_item_id:int,s_quantity:int,s_date:date>
+- Sort [i_item_id#19L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_id#19L, 200), ENSURE_REQUIREMENTS, [id=#167]
+- Filter ((isnotnull(i_price#20) AND (i_price#20 < 10)) AND isnotnull(i_item_id#19L))
+- FileScan parquet aqe_demo_db.items[i_item_id#19L,i_price#20] Batched: true, DataFilters: [isnotnull(i_price#20), (i_price#20 < 10), isnotnull(i_item_id#19L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/items], PartitionFilters: [], PushedFilters: [IsNotNull(i_price), LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema: struct<i_item_id:bigint,i_price:int>

When the query is running, if we check the UI, it initially still shows SMJ.

As I mentioned in this post Spark Tuning -- explaining Spark SQL Join Types,  SMJ actually has 3 steps -- shuffle, sort and merge. 

So after shuffling is done, Spark realized that the smaller side of the join is actually 6.9MB which is smaller than default spark.sql.autoBroadcastJoinThreshold (10MB). As a result, AQE tells Spark to change the plan from SMJ to BHJ at runtime. 

Since the shuffle is done already(otherwise, Spark won't know the real size of the smaller side), this is why the tuning guide says "This is not as efficient as planning a broadcast hash join in the first place".

But anyway, it avoids the rest steps of SMJ -- sort and merge, so it should still be better than a complete SMJ. 

Since the shuffle writes is done, but the the rest steps are just a BHJ. Spark is smart enough to fetch the data from those shuffle files using a "local mode" since spark.sql.adaptive.localShuffleReader.enabled is true by default.

So from UI, you would find extra "CustomShuffleReader"s which are local mode to avoid network traffic:


Below graph is from this blog which explains this local shuffle:

And also # of partitions from local shuffles = #of upstream map tasks. 

In our case, it is 30 and 4. (I will compare these numbers to next test.)

3. AQE on but local shuffle reader is disabled

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.localShuffleReader.enabled=false;

This is for testing purpose, and we should not disable local shuffle reader as always.

The reason why I disable it is to show the shuffle reader statistics differences comparing to #2:

 

Now it is shown as "CustomShuffleReader coalesced".

And also the # of partition changed to 52 and 5 from 30 and 4.

4. GPU Mode with AQE on 

Now let's try the same minimum query using Rapids for Spark Accelerator(current release 0.3) + Spark to see what is the query plan under GPU.

Explain plan output looks as CPU plan, but do not worry, the actual plan is GPU plan:

== Optimized Logical Plan ==
Sort [total_sales#20L DESC NULLS LAST], true, Statistics(sizeInBytes=368.1 PiB)
+- Aggregate [s_date#28], [s_date#28, sum(cast((s_quantity#27 * i_price#30) as bigint)) AS total_sales#20L], Statistics(sizeInBytes=368.1 PiB)
+- Project [s_quantity#27, s_date#28, i_price#30], Statistics(sizeInBytes=368.1 PiB)
+- Join Inner, (cast(s_item_id#26 as bigint) = i_item_id#29L), Statistics(sizeInBytes=589.0 PiB)
:- Filter isnotnull(s_item_id#26), Statistics(sizeInBytes=3.7 GiB)
: +- Relation[s_item_id#26,s_quantity#27,s_date#28] parquet, Statistics(sizeInBytes=3.7 GiB)
+- Filter ((isnotnull(i_price#30) AND (i_price#30 < 10)) AND isnotnull(i_item_id#29L)), Statistics(sizeInBytes=157.6 MiB)
+- Relation[i_item_id#29L,i_price#30] parquet, Statistics(sizeInBytes=157.6 MiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [total_sales#20L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(total_sales#20L DESC NULLS LAST, 2), ENSURE_REQUIREMENTS, [id=#73]
+- HashAggregate(keys=[s_date#28], functions=[sum(cast((s_quantity#27 * i_price#30) as bigint))], output=[s_date#28, total_sales#20L])
+- Exchange hashpartitioning(s_date#28, 2), ENSURE_REQUIREMENTS, [id=#70]
+- HashAggregate(keys=[s_date#28], functions=[partial_sum(cast((s_quantity#27 * i_price#30) as bigint))], output=[s_date#28, sum#34L])
+- Project [s_quantity#27, s_date#28, i_price#30]
+- SortMergeJoin [cast(s_item_id#26 as bigint)], [i_item_id#29L], Inner
:- Sort [cast(s_item_id#26 as bigint) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(s_item_id#26 as bigint), 2), ENSURE_REQUIREMENTS, [id=#62]
: +- Filter isnotnull(s_item_id#26)
: +- FileScan parquet aqe_demo_db.sales[s_item_id#26,s_quantity#27,s_date#28] Batched: true, DataFilters: [isnotnull(s_item_id#26)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/sales], PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema: struct<s_item_id:int,s_quantity:int,s_date:date>
+- Sort [i_item_id#29L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_item_id#29L, 2), ENSURE_REQUIREMENTS, [id=#63]
+- Filter ((isnotnull(i_price#30) AND (i_price#30 < 10)) AND isnotnull(i_item_id#29L))
+- FileScan parquet aqe_demo_db.items[i_item_id#29L,i_price#30] Batched: true, DataFilters: [isnotnull(i_price#30), (i_price#30 < 10), isnotnull(i_item_id#29L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/warehouse/aqe_demo_db.db/items], PartitionFilters: [], PushedFilters: [IsNotNull(i_price), LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema: struct<i_item_id:bigint,i_price:int>

If we actually run this query, here is the actual final plan shown in UI:

The key things to look at here is the "GpuCustomShuffleReader local" and also the # of local shuffle partitions = 30 and 4 which matches the # of upstream map tasks.

Note that in GPU mode, all the data size are smaller than CPU mode.

For example, now the smaller side of join in GPU mode is only 3.4MB now:


It means, we can even set spark.sql.autoBroadcastJoinThreshold=4194304(4MB), it will still be converted to a BHJ under AQE.

And the shuffle writes/reads size are also smaller than CPU mode.

Reference:

 

No comments:

Post a Comment

Popular Posts