Thursday, February 4, 2021

Spark Tuning -- How to use SparkMeasure to measure Spark job metrics

Goal:

This article explains how to use SparkMeasure to measure Spark job metrics.

Env:

Spark 2.4.4 with Scala 2.11.12 

SparkMeasure 0.17

Concept:

SparkMeasure is a very cool tool to collect the aggregated stage or task level metrics for Spark jobs or queries. Basically it creates the customized Spark listeners.

Note: Collecting at task level has additional performance overhead comparing to collecting at stage level. Unless you want to study skew effects for tasks, I would suggest we collect at stage level.

Regarding where those metrics come from, we can look into the Spark source code under "core/src/main/scala/org/apache/spark/executor" folder.

You will find the metrics explanation inside TaskMetrics.scala, ShuffleReadMetrics.scala, ShuffleWriteMetrics.scala, etc.

For example:

  /**
* Time the executor spends actually running the task (including fetching shuffle data).
*/
def executorRunTime: Long = _executorRunTime.sum

/**
* CPU Time the executor spends actually running the task
* (including fetching shuffle data) in nanoseconds.
*/
def executorCpuTime: Long = _executorCpuTime.sum
Or you can find those metrics explanation from the Doc.

Installation:

In this post, we will use spark-shell or spark-submit to test. So we just need to follow this doc to build or download the jar file.

Note: Before downloading/building the jar, make sure the jar should match your spark and scala version. 

a. Download the Jar from Maven Central

For example, based on my spark and scala version, I will choose below version:

wget https://repo1.maven.org/maven2/ch/cern/sparkmeasure/spark-measure_2.11/0.17/spark-measure_2.11-0.17.jar

b. Build the Jar using sbt from source code

git clone https://github.com/lucacanali/sparkmeasure
cd sparkmeasure
sbt +package
ls -l target/scala-2.11/spark-measure*.jar # location of the compiled jar

Solution:

In this post, we will use the sample data and queries from another post "Predicate Pushdown for Parquet".

1.  Interactive Mode using Spark-shell for single job/query

Please refer to this doc for Interactive Mode for Spark-shell.
spark-shell --jars spark-measure_2.11-0.17.jar --master yarn --deploy-mode client --executor-memory 1G --num-executors 4

Stage metrics:

val stageMetrics = new ch.cern.sparkmeasure.StageMetrics(spark)
val q1 = "SELECT * FROM readdf WHERE Index=20000"
stageMetrics.runAndMeasure(sql(q1).show)

Output:

21/02/04 15:08:55 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
21/02/04 15:08:55 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics
Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 2
numTasks => 4
elapsedTime => 3835 (4 s)
stageDuration => 3827 (4 s)
executorRunTime => 4757 (5 s)
executorCpuTime => 3672 (4 s)
executorDeserializeTime => 772 (0.8 s)
executorDeserializeCpuTime => 510 (0.5 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 239 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 5441 (5.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149260233 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0

Task Metrics:

val taskMetrics = new ch.cern.sparkmeasure.TaskMetrics(spark)
val q1 = "SELECT * FROM readdf WHERE Index=20000"
taskMetrics.runAndMeasure(spark.sql(q1).show)

Output:

21/02/04 16:52:59 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
21/02/04 16:52:59 WARN TaskMetrics: Stage metrics data refreshed into temp view PerfTaskMetrics

Scheduling mode = FIFO
Spark Contex default degree of parallelism = 4
Aggregated Spark task metrics:
numtasks => 4
elapsedTime => 3896 (4 s)
duration => 5268 (5 s)
schedulerDelayTime => 94 (94 ms)
executorRunTime => 4439 (4 s)
executorCpuTime => 3561 (4 s)
executorDeserializeTime => 734 (0.7 s)
executorDeserializeCpuTime => 460 (0.5 s)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 237 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
gettingResultTime => 0 (0 ms)
resultSize => 2183 (2.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149260233 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0

2.  Interactive Mode using Spark-shell for multiple jobs/queries

Take Stage Metrics for example:

val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark) 
stageMetrics.begin()

//Run multiple jobs/queries
val q1 = "SELECT * FROM readdf WHERE Index=20000"
val q2 = "SELECT * FROM readdf where Index=9999999999"
spark.sql(q1).show()
spark.sql(q2).show()

stageMetrics.end()
stageMetrics.printReport()

Output:

21/02/04 17:00:59 WARN StageMetrics: Stage metrics data refreshed into temp view PerfStageMetrics

Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 4
numTasks => 8
elapsedTime => 3242 (3 s)
stageDuration => 1094 (1 s)
executorRunTime => 1779 (2 s)
executorCpuTime => 942 (0.9 s)
executorDeserializeTime => 96 (96 ms)
executorDeserializeCpuTime => 37 (37 ms)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 42 (42 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 0 (0 ms)
resultSize => 5441 (5.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 6240991
bytesRead => 149305675 (142.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 0
shuffleTotalBlocksFetched => 0
shuffleLocalBlocksFetched => 0
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 0 (0 Bytes)
shuffleLocalBytesRead => 0 (0 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 0 (0 Bytes)
shuffleRecordsWritten => 0

Further more, below command can print additional accumulables metrics (including SQL metrics):

scala> stageMetrics.printAccumulables()
21/02/04 17:01:26 WARN StageMetrics: Accumulables metrics data refreshed into temp view AccumulablesStageMetrics

Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]

executorCpuTime => 943 (0.9 s)
executorDeserializeCpuTime => 39 (39 ms)
executorDeserializeTime => 96 (96 ms)
executorRunTime => 1779 (2 s)
input.bytesRead => 149305675 (142.0 MB)
input.recordsRead => 6240991
jvmGCTime => 42 (42 ms)
resultSerializationTime => 1 (1 ms)
resultSize => 12780 (12.0 KB)

SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]

146, duration total => 1422 (1 s)
147, number of output rows => 18
148, number of output rows => 6240991
151, scan time total => 1359 (1 s)
202, duration total => 200 (0.2 s)
207, scan time total => 198 (0.2 s)
scala> stageMetrics.printAccumulables()

3.  Flight Recorder Mode

Please refer to this doc for Flight Recorder Mode.

This mode will not touch your code/program and only need to add a jar file when submitting the job.

Take Stage Metrics for example:

spark-submit --conf spark.driver.extraClassPath=./spark-measure_2.11-0.17.jar  \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
--conf spark.sparkmeasure.outputFormat=json \
--conf spark.sparkmeasure.outputFilename="/tmp/stageMetrics_flightRecorder" \
--conf spark.sparkmeasure.printToStdout=false \
--class "PredicatePushdownTest" \
--master yarn \
~/sbt/SparkScalaExample/target/scala-2.11/sparkscalaexample_2.11-1.0.jar

In the output, it will show:

WARN FlightRecorderStageMetrics: Writing Stage Metrics data serialized as json to /tmp/stageMetrics_flightRecorder

The json output file looks as:

$ cat /tmp/stageMetrics_flightRecorder
[ {
"jobId" : 0,
"jobGroup" : null,
"stageId" : 0,
"name" : "load at PredicatePushdownTest.scala:16",
"submissionTime" : 1612488772250,
"completionTime" : 1612488773352,
"stageDuration" : 1102,
"numTasks" : 1,
"executorRunTime" : 352,
"executorCpuTime" : 141,
"executorDeserializeTime" : 589,
"executorDeserializeCpuTime" : 397,
"resultSerializationTime" : 3,
"jvmGCTime" : 95,
"resultSize" : 1969,
"diskBytesSpilled" : 0,
"memoryBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"recordsRead" : 0,
"bytesRead" : 0,
"recordsWritten" : 0,
"bytesWritten" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleTotalBytesRead" : 0,
"shuffleTotalBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleRecordsRead" : 0,
"shuffleWriteTime" : 0,
"shuffleBytesWritten" : 0,
"shuffleRecordsWritten" : 0
}, {
"jobId" : 1,
"jobGroup" : null,
"stageId" : 1,
"name" : "collect at PredicatePushdownTest.scala:25",
"submissionTime" : 1612488774600,
"completionTime" : 1612488776522,
"stageDuration" : 1922,
"numTasks" : 4,
"executorRunTime" : 4962,
"executorCpuTime" : 4446,
"executorDeserializeTime" : 1679,
"executorDeserializeCpuTime" : 1215,
"resultSerializationTime" : 2,
"jvmGCTime" : 309,
"resultSize" : 7545,
"diskBytesSpilled" : 0,
"memoryBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"recordsRead" : 6240991,
"bytesRead" : 149260233,
"recordsWritten" : 0,
"bytesWritten" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleTotalBytesRead" : 0,
"shuffleTotalBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleRecordsRead" : 0,
"shuffleWriteTime" : 0,
"shuffleBytesWritten" : 0,
"shuffleRecordsWritten" : 0
}, {
"jobId" : 2,
"jobGroup" : null,
"stageId" : 2,
"name" : "collect at PredicatePushdownTest.scala:30",
"submissionTime" : 1612488776656,
"completionTime" : 1612488776833,
"stageDuration" : 177,
"numTasks" : 4,
"executorRunTime" : 427,
"executorCpuTime" : 261,
"executorDeserializeTime" : 89,
"executorDeserializeCpuTime" : 27,
"resultSerializationTime" : 0,
"jvmGCTime" : 0,
"resultSize" : 5884,
"diskBytesSpilled" : 0,
"memoryBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"recordsRead" : 0,
"bytesRead" : 45442,
"recordsWritten" : 0,
"bytesWritten" : 0,
"shuffleFetchWaitTime" : 0,
"shuffleTotalBytesRead" : 0,
"shuffleTotalBlocksFetched" : 0,
"shuffleLocalBlocksFetched" : 0,
"shuffleRemoteBlocksFetched" : 0,
"shuffleLocalBytesRead" : 0,
"shuffleRemoteBytesRead" : 0,
"shuffleRemoteBytesReadToDisk" : 0,
"shuffleRecordsRead" : 0,
"shuffleWriteTime" : 0,
"shuffleBytesWritten" : 0,
"shuffleRecordsWritten" : 0
} ]

References:

On Measuring Apache Spark Workload Metrics for Performance Troubleshooting

Example analysis of Spark metrics collected with sparkMeasure

==

No comments:

Post a Comment

Popular Posts