Thursday, March 18, 2021

Spark Tuning -- Adaptive Query Execution(3): Dynamically optimizing skew joins

Goal:

This article explains Adaptive Query Execution (AQE)'s "Dynamically optimizing skew joins" feature introduced in Spark 3.0. 

This is a follow up article for Spark Tuning -- Adaptive Query Execution(1): Dynamically coalescing shuffle partitions, and Spark Tuning -- Adaptive Query Execution(2): Dynamically switching join strategies.

Env:

Spark 3.0.2

Concept:

This article focuses on 3rd feature "Dynamically optimizing skew joins" in AQE.

As Spark Performance Tuning guide described:

This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. 

Below picture from databricks blog describes well:

Below 2 parameters determines a "skew partition". It has to meet both of below 2 conditions:

  • a. Its partition size > spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size"
  • b. Its partition size > spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB)

The source code of this feature is inside org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin .

Before doing below tests, we can enable log4j DEBUG for above java class so that it can help print the sizes of those partitions. For example, we can put below line in log4j.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin=DEBUG
And then ask executor to use this log4j file:
spark.executor.extraJavaOptions '-Dlog4j.configuration=$SPARK_HOME/conf/log4j.properties'

Solution:

As per databricks blog "Adaptive Query Execution: Speeding Up Spark SQL at Runtime", it has a pretty good demo notebook which I will use for the following tests. The query which contains skew data is:

use aqe_demo_db;

SELECT s_date, sum(s_quantity * i_price) AS total_sales
FROM sales
JOIN items ON i_item_id = s_item_id
GROUP BY s_date
ORDER BY total_sales DESC;

1. AQE off

This is default run without AQE. Query duration is 6.4min in my test lab.

Because data skew exists in "sales" table with "s_item_id=100"(80% of the data),  the default run will result in a long running SortMergeJoin(SMJ). 

One task in the Shuffle Phase will Shuffle Read 5.8GB data while other 199 tasks only read 14.3MB data in average.  It also result in huge spilling on disk.

Spilling monitoring:

$  pwd
/tmp/spark-40b20c3b-7f04-4cc4-9134-7d64be53f919/executor-473da0f7-2d70-4565-91e7-ba5f3ea12a8a/blockmgr-78d16f8e-fc7b-4190-a08e-f96f57aabf97
$ find . -name *.*
.
./20/shuffle_2_91_0.index
./20/shuffle_2_219_0.data
./34/shuffle_2_170_0.index
./34/shuffle_2_192_0.index
...

Jstack on executor process also shows:

"Executor task launch worker for task 90.0 in stage 2.0 (TID 124)" #54 daemon prio=5 os_prio=0 cpu=222822.75ms elapsed=237.59s tid=0x00007f81000d2000 nid=0xe50 runnable  [0x00007f8150e18000]
java.lang.Thread.State: RUNNABLE
at net.jpountz.xxhash.XXHashJNI.XXH32_update(Native Method)
at net.jpountz.xxhash.StreamingXXHash32JNI.update(StreamingXXHash32JNI.java:67)
- locked <0x0000000735011230> (a net.jpountz.xxhash.StreamingXXHash32JNI)
at net.jpountz.xxhash.StreamingXXHash32$1.update(StreamingXXHash32.java:119)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:206)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:260)
at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:136)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:544)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:228)
at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:208)
- locked <0x0000000581600ea8> (a org.apache.spark.memory.TaskMemoryManager)
at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:289)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:95)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:361)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:417)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:455)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.findNextInnerJoinRows$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:774)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$539/0x00000008404f9440.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.10/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.10/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@11.0.10/Thread.java:834)

 2. AQE on

set spark.sql.adaptive.enabled=true;

This is default run with AQE on. Query duration is 2.4min in my test lab.

Below debug log is printed which shows the skewed partition size is about 6GB, and AQE split it into 30 partitions:

Optimizing skewed join.
Left side partitions size info:
median size: 13972650, max size: 6517549080, min size: 13972650, avg size: 46499052
Right side partitions size info:
median size: 1549072, max size: 1549072, min size: 1549072, avg size: 1549072

DEBUG OptimizeSkewedJoin: Left side partition 23 (6 GB) is skewed, split it into 30 parts.
DEBUG OptimizeSkewedJoin: number of skewed partitions: left 1, right 0

Extra "CustomShuffleReader" also shows skew partition information. 

This stage has 81 partitions, which include 51 normal partitions + 30 skewed partitions.

It means, if AQE did not trigger this skew optimization, the original partition size should be 52. (Remember this number -- 52 because it will show up later.)

 3. AQE on + increased spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=6517549081;

Query duration is 6.6min in my test lab.

Here I am trying to test spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes so I just set it to 1+<above max partition size>.  The goal is to not trigger skew join optimization.

Below debug log is printed which means no skewed partition found.

DEBUG OptimizeSkewedJoin:
Optimizing skewed join.
Left side partitions size info:
median size: 13972650, max size: 6517549080, min size: 13972650, avg size: 46499052
Right side partitions size info:
median size: 1549072, max size: 1549072, min size: 1549072, avg size: 1549072

DEBUG OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
DEBUG OptimizeSkewedJoin: OptimizeSkewedJoin rule is not applied due to additional shuffles will be introduced.

Now we can see "CustomShuffleReader" only spawns 52 partitions from UI:

 4. GPU Mode with AQE off

Now let's try the same minimum query using Rapids for Spark Accelerator(current release 0.3) + Spark.

Query duration is 26s in my test lab.  (Yes only 26s without AQE on!)

No debug log triggered since AQE is off.

GPU Mode will trigger GPU version ShuffleHashJoin(SHJ) which is super fast even without AQE:

There are only 2 partitions/tasks for shuffle stage. 

From the Stage-20 metrics below we can see even though there is huge data skew, the skewed task only took 15s to compete. Thanks to Apache Arrow columnar memory format.

5. GPU Mode with AQE on

set spark.sql.adaptive.enabled = true;

Query duration is 25s in my test lab. 

Below debug log shows smaller partition sizes under GPU mode comparing to CPU mode:

DEBUG OptimizeSkewedJoin:
Optimizing skewed join.
Left side partitions size info:
median size: 3645779055, max size: 6266874120, min size: 1024683990, avg size: 3645779055
Right side partitions size info:
median size: 112912836, max size: 112912836, min size: 112912836, avg size: 112912836

DEBUG OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
DEBUG OptimizeSkewedJoin: OptimizeSkewedJoin rule is not applied due to additional shuffles will be introduced.

Here is because GPU mode does not have SMJ implemented yet as of today. So this AQE feature can not apply here. That is why you see no skewed partition found and it is still using GPU version ShuffleHashJoin.

However the query plan is a little different here, and AQE does spawns 2 extra "GpuCustomShuffleReader":

Reference:


No comments:

Post a Comment

Popular Posts