Wednesday, February 17, 2021

Spark Tuning -- Understanding the Spill from a Cartesian Product

Goal:

This article explains how to understand the spilling from a Cartesian Product.

We will explain the meaning of below 2 parameters, and also the metrics "Shuffle Spill (Memory)" and "Shuffle Spill (Disk) " on webUI.

  • spark.sql.cartesianProductExec.buffer.in.memory.threshold
  • spark.sql.cartesianProductExec.buffer.spill.threshold

Env:

Spark 2.4.4

Code Analysis:

 Cartesian Product is one join type mentioned in my previous post -- Spark Tuning -- explaining Spark SQL Join Types.

1. The code logic is inside CartesianProductExec.scala , and it creates UnsafeCartesianRDD which is an optimized CartesianRDD for UnsafeRow.

UnsafeCartesianRDD needs 2 input variables -- inMemoryBufferThreshold and spillThreshold which are related to below 2 parameters:

  • spark.sql.cartesianProductExec.buffer.in.memory.threshold=4096 by default
Threshold for number of rows guaranteed to be held in memory by the cartesian product operator.
  • spark.sql.cartesianProductExec.buffer.spill.threshold=2147483647 by default

Threshold for number of rows to be spilled by cartesian product operator


2. UnsafeCartesianRDD uses ExternalAppendOnlyUnsafeRowArray which is an append-only array for [[UnsafeRow]]s.

ExternalAppendOnlyUnsafeRowArray needs the input variables which are also related to above 2 parameters.

/**
* An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
* until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
* would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
* excessive memory consumption). Setting these threshold involves following trade-offs:
*
* - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
* than is available, resulting in OOM.
* - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
* excessive disk writes. This may lead to a performance regression compared to the normal case
* of using an [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
blockManager: BlockManager,
serializerManager: SerializerManager,
taskContext: TaskContext,
initialSize: Int,
pageSizeBytes: Long,
numRowsInMemoryBufferThreshold: Int,
numRowsSpillThreshold: Int) extends Logging {


3. ExternalAppendOnlyUnsafeRowArray uses UnsafeExternalSorter and passes above 1 variable in.

    if (numRows < numRowsInMemoryBufferThreshold) {
inMemoryBuffer += unsafeRow.copy()
} else {
if (spillableArray == null) {
logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " +
s"${classOf[UnsafeExternalSorter].getName}")

// We will not sort the rows, so prefixComparator and recordComparator are null
spillableArray = UnsafeExternalSorter.create(
taskMemoryManager,
blockManager,
serializerManager,
taskContext,
null,
null,
initialSize,
pageSizeBytes,
numRowsSpillThreshold,
false)

Here we see the role of the first parameter -- spark.sql.cartesianProductExec.buffer.in.memory.threshold.

If the number of rows >= spark.sql.cartesianProductExec.buffer.in.memory.threshold, it can spill by creating UnsafeExternalSorter.

In the meantime, you should see INFO message from executor log such as:

INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 100 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

Note: each task could print above message once inside executor log.

After that, the parameter "spark.sql.cartesianProductExec.buffer.spill.threshold" is passed into UnsafeExternalSorter to create the spill-able array.


4.  UnsafeExternalSorter will be responsible for the actual spilling work:

  /**
* Force this sorter to spill when there are this many elements in memory.
*/
private final int numElementsForSpillThreshold;


if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
spill();
}

Basically for every spark.sql.cartesianProductExec.buffer.spill.threshold number of rows, it will spill once. 

So the smaller the parameter is, the more "spill()" happening.

Whenever "spill()" is called, you should see INFO message from executor log such as: 

INFO UnsafeExternalSorter: Spilling data because number of spilledRecords crossed the threshold 10

Note: each task could print above message many times(=number of spill()) inside executor log. 

 

5. spill() is a method of UnsafeExternalSorter and it will print below message:

    logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spillWriters.size(),
spillWriters.size() > 1 ? " times" : " time");

So whenever "spill()" is called, you should see below INFO message from executor log such as:

INFO UnsafeExternalSorter: Thread 504 spilling sort data of 32.0 MB to disk (15  times so far)

Note: each task could print above message many times(=number of spill()) inside executor log. 

Note: the 32MB is actually the memory size for that "spill()" which is related to the metric "Shuffle Spill (Memory)" from webUI . I will explain it in details later.

Test Design:

Here we will use TPC-DS sample data shared by this post

We just want to do a Cartesian Product between table "customer" and "store_sales" in this post.

Let me firstly explain some spark parameters I set to make my test works as what I want:

1. set spark.sql.crossJoin.enabled=true;

This has to be enabled to force a Cartesian Product.

2. set spark.sql.autoBroadcastJoinThreshold=1;

This is to disable Broadcast Nested Loop Join (BNLJ) so that a Cartesian Product will be chosen.

3. set spark.sql.files.maxPartitionBytes=1342177280;

As we know, Cartesian Product will spawn N*M tasks. 

(N = left side table's partition size, M= right side table's partition size)

So I want to increase spark.sql.files.maxPartitionBytes to make a N=2 and M=7.

As a result, in below tests, the total number of tasks=14.

Note: you can control the number of tasks as whatever value you like as long as you have a large enough cluster.

4. set spark.sql.cartesianProductExec.buffer.in.memory.threshold=100;

Basically I just decrease it to small enough to make sure each task can trigger UnsafeExternalSorter.

5. Then I will test different values for spark.sql.cartesianProductExec.buffer.spill.threshold from 50 to 12800. 

6. The sample query is:

select count(*) from 
(
select c.c_current_addr_sk,s.ss_sold_time_sk as c2,s.ss_cdemo_sk as c3 from customer c, store_sales s
where c.c_customer_sk % 1000 = 2
and s.ss_ticket_number % 1000 = 1
);

Here since my table "customer" and "store_sales"  are too large so I used some filters to limit the table size for left and right sides.

Here are the left and right sides' size:

spark-sql> select count(*) from store_sales where ss_ticket_number % 1000 = 1;
28646
spark-sql> select count(*) from customer where c_customer_sk % 1000 = 2;
500

So we know that above sample query will produce 28646*500=14323000 rows eventually. 

7. Enable INFO message in log4j settings for executors

By default in Spark 2.4.4 of MapR 6.1 env, the log4j setting is set to WARN for root logger.

Since we need to look into the executor logs, we need to enable INFO message in log4j.properties:

log4j.rootCategory=INFO, console

After that, start spark-sql using below option:

spark-sql --master yarn --deploy-mode client --files /opt/mapr/spark/spark-2.4.4/conf/log4j.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties"

8. disable spark.unsafe.sorter.spill.read.ahead.enabled to avoid bug SPARK-30225 in Spark 2.4.4

SPARK-30225 is fixed on 2.4.5 and 3.0.0. 

Since our tests are running on Spark 2.4.4, so we need to disable readahead of unsafe spill as the workaround. Otherwise the Spark job will fail with error "java.io.IOException: Stream is corrupted" when spilling.

Metrics Monitoring:

From the webUI, we may see metrics "Shuffle Spill (Memory)" and "Shuffle Spill (Disk)"  if spills happens. 

But what do they mean?

From the TaskMetrics.scala, we know :

"Shuffle Spill (Memory)" is "The number of in-memory bytes spilled by this task.".

"Shuffle Spill (Disk)" is "The number of on-disk bytes spilled by this task.".

So I am trying to collect and monitor above metrics from webUI to see what is the relationship to the actual disk size used by spilling.

That is the reason why I designed this test to match theory with practice.

Let me firstly explain How did I collect/calculate below stuff in my spreadsheet shared later:

1. "Total Shuffle Spill (Memory) in GB" and "Total Shuffle Spill (Disk) in KB"

This is the aggregated stage-level metrics shown on the webUI. Below is one example:

 

2. "Total # of spills"

I searched the key words "spilling sort data" from each executor log and then sum it manually.

For example, in the YARN container log for each executor(I am using Spark on YARN in this test):

[root@h2 container_e03_1611617415539_0134_01_000002]# strings stderr |grep "spilling sort data" |wc -l
226

3. "Size of each spill file in B"

I tried to capture the real spill file size on each node:

[root@h2 blockmgr-2e25a3fc-0be1-4f92-9914-393183b1f511]#  pwd
/tmp/hadoop-mapr/nm-local-dir/usercache/mapr/appcache/application_1611617415539_0134/blockmgr-2e25a3fc-0be1-4f92-9914-393183b1f511
[root@h2 blockmgr-2e25a3fc-0be1-4f92-9914-393183b1f511]# while true; do ls -altr */temp*; done

Note: you should know where is the spill directory on each node. By default it is under /tmp with a sub-directory name starting from "blockmgr-".

For example, below output means each spill file size is 167 Bytes:

[root@h4 blockmgr-9c48b3db-9f67-41ec-ad6e-bd6c11901a19]# ls -altr */temp*
-rw-r----- 1 mapr mapr 167 Feb 16 16:48 21/temp_local_5d582ec3-52a0-4b30-b501-7b39d7c3c593
-rw-r----- 1 mapr mapr 167 Feb 16 16:48 2f/temp_local_ebb4cebb-15bd-466a-b383-e3584659662a

4. "Sum(Spill size) in KB"

In my spreadsheet, this is calculated by  "Total # of spills" *
"Size of each spill file in B".

Basically I just want to compare it with  "Total Shuffle Spill (Disk) in KB" to see if they match.

Test results:

 1. When spark.shuffle.spill.compress=true (Default)

spark.sql.cartesianProductExec.buffer.spill.thresholdTotal Shuffle Spill (Memory) in GBTotal Shuffle Spill (Disk) in KBTotal # of spillsSize of each spill file in BSum(Spill size) in KB
5035.783.611427583.64257813
10017.742.55667742.56054688
2008.7822.42808222.421875
8002.067.05661107.08984375
16000.934.3301484.3359375
32000.251.9982551.9921875
64000.06250.8524360.8515625
1280000000

It tells that:

  • The webUI's metric "Shuffle Spill (Disk)" match the real total size of spill files.
  • The webUI's metric "Shuffle Spill (Memory)" increases or decreases with "Shuffle Spill (Disk)" at the same pace.
  • The lower the value of spark.sql.cartesianProductExec.buffer.spill.threshold is, the more memory needed for spilling. In the worst situation during my test when spark.sql.cartesianProductExec.buffer.spill.threshold=10, OutOfMemory happens because the memory needed for spilling is larger than the total executor memory. This interesting result tells us that: even though we want to use spilling to save some memory for certain operators, but too many spills may also result in memory pressure or even OOM error.
  •  If we devide "Total Shuffle Spill (Memory) in GB" by "Total # of spills", we know each spill needs 32MB memory. This matches the INFO message we saw in the executor log.

 2. When spark.shuffle.spill.compress=false 

spark.sql.cartesianProductExec.buffer.spill.thresholdTotal Shuffle Spill (Memory) in GBTotal Shuffle Spill (Disk) in KBTotal # of spillsSize of each spill file in BSum(Spill size) in KB
5035.7691.41142620691.4453125
10017.7674.35661220674.3359375
2008.8661.72802420661.71875
8002.1620669620620.0390625
16000.9375563.13019220563.0859375
32000.25300.2838420300.15625
64000.0625150276820150.0390625
1280000000

It tells that:

  • No matter the spill compression is on or off, the "Total Shuffle Spill (Memory) in GB" does not change.
  • Without any compression for spilling files, the size of each spill file is much larger than before.
  • We can now know the compression ratio between above 2 result sets.

 3. When spark.shuffle.spill.compress=false and also increasing the right side by 1000x.

 Here what I mean by "increasing the right side by 1000x" is to remove the filter on right side table "c.c_customer_sk % 1000 = 2". Now the right side has 28800501 rows.

New query looks as below:

select count(*) from
(
select c.c_current_addr_sk,s.ss_sold_time_sk as c2,s.ss_cdemo_sk as c3 from customer c, store_sales s
where c.c_customer_sk % 1000 = 2
); 

spark.sql.cartesianProductExec.buffer.spill.thresholdTotal Shuffle Spill (Memory) in GBTotal Shuffle Spill (Disk) in KBTotal # of spillsSize of each spill file in BSum(Spill size) in KB
4000004.3652390.41384800020646877.6953



 

Then let's swap the left and right side and do the same test:

select count(*) from
(
select c.c_current_addr_sk,s.ss_sold_time_sk as c2,s.ss_cdemo_sk as c3 from store_sales s, customer c
where c.c_customer_sk % 1000 = 2
);
spark.sql.cartesianProductExec.buffer.spill.thresholdTotal Shuffle Spill (Memory) in GBTotal Shuffle Spill (Disk) in KBTotal # of spillsSize of each spill file in BSum(Spill size) in KB
40000000000


Now we understand the order of the left and right side in Cartesian Product matters because as CartesianProductExec.scala explains it "will cache the rows from second child RDD". And this is where spilling happens.

  • If we do not want too much spilling, make sure the smaller side is on the right side.

4. Jstack output on executor when spilling

"Executor task launch worker for task 1852" #119 daemon prio=5 os_prio=0 tid=0x00007fb88870c000 nid=0xc7c runnable [0x00007fb84c2e9000]
java.lang.Thread.State: RUNNABLE
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:701)
at java.lang.StringBuilder.append(StringBuilder.java:214)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:264)
at org.apache.spark.SparkConf$$anonfun$getSizeAsBytes$3.apply$mcJ$sp(SparkConf.scala:327)
at org.apache.spark.SparkConf$$anonfun$getSizeAsBytes$3.apply(SparkConf.scala:327)
at org.apache.spark.SparkConf$$anonfun$getSizeAsBytes$3.apply(SparkConf.scala:327)
at org.apache.spark.SparkConf.catchIllegalValue(SparkConf.scala:484)
at org.apache.spark.SparkConf.getSizeAsBytes(SparkConf.scala:326)
at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:64)
at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:156)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getIterator(UnsafeExternalSorter.java:647)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.generateIterator(ExternalAppendOnlyUnsafeRowArray.scala:171)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.generateIterator(ExternalAppendOnlyUnsafeRowArray.scala:175)
at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.org$apache$spark$sql$execution$joins$UnsafeCartesianRDD$$createIter$1(CartesianProductExec.scala:49)
at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD$$anonfun$1.apply(CartesianProductExec.scala:53)
at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD$$anonfun$1.apply(CartesianProductExec.scala:52)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.org$apache$spark$executor$Executor$TaskRunner$$anonfun$$res$1(Executor.scala:412)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:419)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1359)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:430)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

This jstack output matches our theory from code analysis.

In all, I hope this post explains the meanings of above 2 Cartesian Product spill related parameters and also the 2 metrics from the webUI.

 

No comments:

Post a Comment

Popular Posts