Tuesday, April 20, 2021

Spark Code -- Dig into SparkListenerEvent

Goal:

This article digs into different types of SparkListenerEvent in Spark event log with some examples. 

Understanding this can help us know how to pares Spark event log.

Env:

Apache Spark 3.1.1 source code

Solution:

WARNING: this article will help us understand all below SparkListenerEvent in Spark event log with examples. It may contains lots of Apache Source code analysis.  If you do not like reading a bunch of source code, you can stop now.

As we know, Spark event log can be shown in Spark HistoryServer(SHS) UI nicely. Then why do we try to parse the Spark event log manually? 

The answer is, SHS only shows a small portion of the event log. There are lots of good stuff inside Spark event log such as task metrics, SQL Plan node accumulables, etc.

Basically event log is a file with different json lines, with each line coming from different Scala case classes which extend a trait(interface) called "SparkListenerEvent".  Those definition is inside core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala.

Spark has its own EventLogFileReaders which is backward compatible, so we do not need to write json parser to parse the json object ourselves. One reason is our own json parser could be out of date as well if event log format changes in the future Spark versions.

So if our interest is to parse the event log, we can learn how SHS parses it. The logic is inside FsHistoryProvider.scala:

Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath, fs))

If we used "jq" to format the event log in a human readable format, you can find the details of each json object. 

Now let's look into each of below 21 types of SparkListenerEvent:

Some of them are very simple and straightforward, but some of them are very difficult to understand the logic: especially there are 6 different types of events handling SQL plan accumulables with each other, and AQE related events may override the query plan got from previous events.

  1. SparkListenerLogStart
  2. SparkListenerResourceProfileAdded
  3. SparkListenerBlockManagerAdded
  4. SparkListenerBlockManagerRemoved
  5. SparkListenerEnvironmentUpdate
  6. SparkListenerTaskStart
  7. SparkListenerApplicationStart
  8. SparkListenerExecutorAdded
  9. SparkListenerExecutorRemoved
  10. SparkListenerSQLExecutionStart
  11. SparkListenerSQLExecutionEnd
  12. SparkListenerDriverAccumUpdates
  13. SparkListenerJobStart
  14. SparkListenerStageSubmitted
  15. SparkListenerTaskEnd
  16. SparkListenerStageCompleted
  17. SparkListenerJobEnd
  18. SparkListenerTaskGettingResult
  19. SparkListenerApplicationEnd
  20. SparkListenerSQLAdaptiveExecutionUpdate
  21. SparkListenerSQLAdaptiveSQLMetricUpdates

1. SparkListenerLogStart

Sample json object:

{
"Event": "SparkListenerLogStart",
"Spark Version": "3.1.1"
}

Case class definition:

case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

Very straightforward we can get spark version from it.

2. SparkListenerResourceProfileAdded

Sample json object:

{
"Event": "SparkListenerResourceProfileAdded",
"Resource Profile Id": 0,
"Executor Resource Requests": {
"cores": {
"Resource Name": "cores",
"Amount": 16,
"Discovery Script": "",
"Vendor": ""
},
"memory": {
"Resource Name": "memory",
"Amount": 81920,
"Discovery Script": "",
"Vendor": ""
},
"offHeap": {
"Resource Name": "offHeap",
"Amount": 0,
"Discovery Script": "",
"Vendor": ""
},
"gpu": {
"Resource Name": "gpu",
"Amount": 1,
"Discovery Script": "/xxx/xxx/xxx/xxx/examples/src/main/scripts/getGpusResources.sh",
"Vendor": ""
}
},
"Task Resource Requests": {
"cpus": {
"Resource Name": "cpus",
"Amount": 1
},
"gpu": {
"Resource Name": "gpu",
"Amount": 0.25
}
}
}

Case class definition:

case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
extends SparkListenerEvent

What is ResourceProfile?

class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest])

What are ExecutorResourceRequest and TaskResourceRequest?

class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
val vendor: String = "") extends Serializable {
...

class TaskResourceRequests() extends Serializable {
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava
def cpus(amount: Int): this.type = {
def resource(resourceName: String, amount: Double): this.type = {
...

After some digging into, we know SparkListenerResourceProfileAdded contains the executor and task resource requests such as CPU, Memory, GPU, etc. 

For GPU resource, it is a little difficult to get, because we need to get it from a Map instead of getting it directly by calling a method or a function. 

3. SparkListenerBlockManagerAdded

Sample json object:

{
"Event": "SparkListenerBlockManagerAdded",
"Block Manager ID": {
"Executor ID": "driver",
"Host": "myhostname",
"Port": 44159
},
"Maximum Memory": 3032481792,
"Timestamp": 1618341863606,
"Maximum Onheap Memory": 3032481792,
"Maximum Offheap Memory": 0
}

Case class definition:

case class SparkListenerBlockManagerAdded(
time: Long,
blockManagerId: BlockManagerId,
maxMem: Long,
maxOnHeapMem: Option[Long] = None,
maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
}

What is BlockManagerId.scala?

class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
private var topologyInfo_ : Option[String])
extends Externalizable {

SparkListenerBlockManagerAdded contains Executor's resource information such as executorId, hostname, port, and max memory size.

4. SparkListenerBlockManagerRemoved

Sample json object:

{
"Event": "SparkListenerBlockManagerRemoved",
"Block Manager ID": {
"Executor ID": "1",
"Host": "myhostname",
"Port": 12345
},
"Timestamp": 1111111111111
}

Case class definition:

case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)

SparkListenerBlockManagerRemoved contains the timestamp when an executor gets removed.

Normally it means some executors fails with some error and we may see it come together with SparkListenerExecutorRemoved.

5. SparkListenerEnvironmentUpdate

Sample json object:

{
"Event": "SparkListenerEnvironmentUpdate",
"JVM Information": {
"Java Home": "/xxx/xxx/xxx/envs/xxx",
"Java Version": "11.0.9.1-internal (Oracle Corporation)",
"Scala Version": "version 2.12.10"
},
"Spark Properties": {
"spark.rapids.sql.exec.CollectLimitExec": "true",
"spark.executor.resource.gpu.amount": "1",
"spark.rapids.sql.concurrentGpuTasks": "1",
...
}
"Hadoop Properties": {
"yarn.resourcemanager.amlauncher.thread-count": "50",
"dfs.namenode.resource.check.interval": "5000",
...
}
"System Properties": {
"java.io.tmpdir": "/tmp",
"line.separator": "\n",
...
}
"Classpath Entries": {
"/home/xxx/spark/jars/curator-framework-2.7.1.jar": "System Classpath",
"/home/xxx/spark/jars/parquet-encoding-1.10.1.jar": "System Classpath",
"/home/xxx/spark/jars/commons-dbcp-1.4.jar": "System Classpath",
...
}
}

Case class definition:

case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
extends SparkListenerEvent

SparkListenerEnvironmentUpdate is a Map which contains the Spark/Hadoop/System/...  properties.

It is useful for us to do some parameter checks.

6. SparkListenerTaskStart

Sample json object:

{
"Event": "SparkListenerTaskStart",
"Stage ID": 0,
"Stage Attempt ID": 0,
"Task Info": {
"Task ID": 0,
"Index": 0,
"Attempt": 0,
"Launch Time": 1618341870400,
"Executor ID": "0",
"Host": "111.111.111.111",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 0,
"Finish Time": 0,
"Failed": false,
"Killed": false,
"Accumulables": []
}
}

Case class definition:

case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)

What is TaskInfo?

class TaskInfo(
val taskId: Long,
/**
* The index of this task within its task set. Not necessarily the same as the ID of the RDD
* partition that the task is computing.
*/
val index: Int,
val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
val taskLocality: TaskLocality.TaskLocality,
val speculative: Boolean) {

SparkListenerTaskStart contains the task start time, related executor information.

Note: Normally the accumulables are empty in the beginning.

7. SparkListenerApplicationStart

Sample json object:

{
"Event": "SparkListenerApplicationStart",
"App Name": "Spark Pi",
"App ID": "app-20210413122423-0000",
"Timestamp": 1618341862473,
"User": "xxxx"
}

Case class definition:

case class SparkListenerApplicationStart(
appName: String,
appId: Option[String],
time: Long,
sparkUser: String,
appAttemptId: Option[String],
driverLogs: Option[Map[String, String]] = None,
driverAttributes: Option[Map[String, String]] = None) extends SparkListenerEvent

SparkListenerApplicationStart contains the application start time, application name, application ID and user name.

Normally only one of such event in each event log.

8. SparkListenerExecutorAdded

Sample json object:

{
"Event": "SparkListenerExecutorAdded",
"Timestamp": 1618341865601,
"Executor ID": "0",
"Executor Info": {
"Host": "111.111.111.111",
"Total Cores": 16,
"Log Urls": {
"stdout": "http://111.111.111.111:8081/logPage/?appId=app-20210413122423-0000&executorId=0&logType=stdout",
"stderr": "http://111.111.111.111:8081/logPage/?appId=app-20210413122423-0000&executorId=0&logType=stderr"
},
"Attributes": {},
"Resources": {
"gpu": {
"name": "gpu",
"addresses": [
"0"
]
}
},
"Resource Profile Id": 0
}
}

Case class definition:

case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo)

What is ExecutorInfo?

class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val attributes: Map[String, String],
val resourcesInfo: Map[String, ResourceInformation],
val resourceProfileId: Int) {

SparkListenerExecutorAdded contains the timestamp, and executor information. 

Note that, it is related to a resource profile.

9. SparkListenerExecutorRemoved

Sample json object:

{
"Event": "SparkListenerExecutorRemoved",
"Timestamp": 1111111111111,
"Executor ID": "1",
"Removed Reason": "Container from a bad node: container_1111111111111_1111_11_111111 on host: abc.abc.abc.abc"
}

Case class definition:

case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)

SparkListenerExecutorRemoved contains the timestamp and the reason why an executor gets removed.

Normally it means executor fails due to some reason such as OOM.

10. SparkListenerSQLExecutionStart

Sample json object:

{
"Event": "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart",
"executionId": 3,
"description": "select count(*) from customer a, customer b where a.c_customer_id=b.c_customer_id+10",
"details": "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:566)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)",
"physicalPlanDescription": "== Physical Plan ==\nGpuColumnarToRow (14)\n+- GpuHashAggregate (13)\n +- GpuShuffleCoalesce (12)\n +- GpuColumnarExchange (11)\n +- GpuHashAggregate (10)\n +- GpuProject (9)\n +- GpuBroadcastHashJoin (8)\n :- GpuCoalesceBatches (3)\n : +- GpuFilter (2)\n : +- GpuScan parquet tpcds.customer (1)\n +- GpuBroadcastExchange (7)\n +- GpuCoalesceBatches (6)\n +- GpuFilter (5)\n +- GpuScan parquet tpcds.customer (4)\n\n\n(1) GpuScan parquet tpcds.customer\nOutput [1]: [c_customer_id#2]\nBatched: true\nLocation: InMemoryFileIndex [file:/home/xxxxx/data/tpcds_100G_parquet/customer]\nPushedFilters: [IsNotNull(c_customer_id)]\nReadSchema: struct<c_customer_id:string>\n\n(2) GpuFilter\nInput [1]: [c_customer_id#2]\nArguments: gpuisnotnull(c_customer_id#2)\n\n(3) GpuCoalesceBatches\nInput [1]: [c_customer_id#2]\nArguments: TargetSize(2147483647)\n\n(4) GpuScan parquet tpcds.customer\nOutput [1]: [c_customer_id#27]\nBatched: true\nLocation: InMemoryFileIndex [file:/home/xxxxx/data/tpcds_100G_parquet/customer]\nPushedFilters: [IsNotNull(c_customer_id)]\nReadSchema: struct<c_customer_id:string>\n\n(5) GpuFilter\nInput [1]: [c_customer_id#27]\nArguments: gpuisnotnull(c_customer_id#27)\n\n(6) GpuCoalesceBatches\nInput [1]: [c_customer_id#27]\nArguments: TargetSize(2147483647)\n\n(7) GpuBroadcastExchange\nInput [1]: [c_customer_id#27]\nArguments: HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero((cast(input[0, string, false] as double) + 10.0)))),false), [id=#97]\n\n(8) GpuBroadcastHashJoin\nLeft output [1]: [c_customer_id#2]\nRight output [1]: [c_customer_id#27]\nArguments: [gpuknownfloatingpointnormalized(gpunormalizenanandzero(cast(c_customer_id#2 as double)))], [gpuknownfloatingpointnormalized(gpunormalizenanandzero((cast(c_customer_id#27 as double) + 10.0)))], Inner, GpuBuildRight\n\n(9) GpuProject\nInput [2]: [c_customer_id#2, c_customer_id#27]\n\n(10) GpuHashAggregate\nInput: []\nKeys: []\nFunctions [1]: [partial_gpucount(1)]\nAggregate Attributes [1]: [count#46L]\nResults [1]: [count#47L]\n\n(11) GpuColumnarExchange\nInput [1]: [count#47L]\nArguments: gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#101]\n\n(12) GpuShuffleCoalesce\nInput [1]: [count#47L]\nArguments: 2147483647\n\n(13) GpuHashAggregate\nInput [1]: [count#47L]\nKeys: []\nFunctions [1]: [gpucount(1)]\nAggregate Attributes [1]: [count(1)#25L]\nResults [1]: [count(1)#25L AS count(1)#44L]\n\n(14) GpuColumnarToRow\nInput [1]: [count(1)#44L]\nArguments: false\n\n",
"sparkPlanInfo": {
"nodeName": "GpuColumnarToRow",
"simpleString": "GpuColumnarToRow false",
"children": [
{
"nodeName": "GpuHashAggregate",
"simpleString": "GpuHashAggregate(keys=[], functions=[gpucount(1)]), filters=List(None))",
"children": [
...
"children": [
{
"nodeName": "GpuScan parquet tpcds.customer",
"simpleString": "GpuFileGpuScan parquet tpcds.customer[c_customer_id#2] Batched: true, DataFilters: [isnotnull(c_customer_id#2)], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxxxx/data/tpcds_100G_parquet/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id)], ReadSchema: struct<c_customer_id:string>",
"children": [],
"metadata": {},
"metrics": [
{
"name": "number of files read",
"accumulatorId": 209,
"metricType": "sum"
},

Case class definition:

case class SparkListenerSQLExecutionStart(
executionId: Long,
description: String,
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long)
extends SparkListenerEvent
What is SparkPlanInfo?
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo]) {

What is SQLMetricInfo?

class SQLMetricInfo(
val name: String,
val accumulatorId: Long,
val metricType: String)

 Now we are getting the complex part. 

SparkListenerSQLExecutionStart contains the query plan, and its accumulables(metrics) definition.

Remember that here the query plan information may be overridden by upcoming AQE related events SparkListenerSQLAdaptiveExecutionUpdate;

And the accumulables(metrics) definition could be overriden by upcoming AQE related events SparkListenerSQLAdaptiveSQLMetricUpdates.

So none of them are final. Please remember they may change later when parsing this event.

Note: The SQL plan accumulables are associated with its SQL Plan Node by nodeID!

For example, when the final parsing is done, it should show the mapping relationship between SQL plan nodeID <=> accumulatorId:

+-----+------+---------------------+-------------+-------------------------+------------+----------+
|sqlID|nodeID|nodeName |accumulatorId|name |max_value |metricType|
+-----+------+---------------------+-------------+-------------------------+------------+----------+
|11 |5 |Scan parquet |123 |number of output rows |11 |sum |
|11 |5 |Scan parquet |124 |number of files read |1 |sum |
|11 |5 |Scan parquet |125 |metadata time |1 |timing |
|11 |5 |Scan parquet |126 |size of files read |1111 |size |
|11 |5 |Scan parquet |127 |scan time |11 |timing |

11. SparkListenerSQLExecutionEnd

Sample json object:

{
"Event": "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
"executionId": 0,
"time": 1617729547596
}

Case class definition:

case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)

Easy: it contains the SQL end timestamp. If we map the end timestamp to previous start time, we can get the SQL duration in ms.

12. SparkListenerDriverAccumUpdates

Sample json object:

{
"Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
"executionId": 2,
"accumUpdates": [
[
67,
1
],
[
68,
2
],
[
69,
106281839
]
]
}

Case class definition:

 * @param executionId The execution id for a query, so we can find the query plan.
* @param accumUpdates Map from accumulator id to the metric value (metrics are always 64-bit ints).

case class SparkListenerDriverAccumUpdates(
executionId: Long,
@JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
accumUpdates: Seq[(Long, Long)])

SparkListenerDriverAccumUpdates mainly sends the accumulator id => accumulator value pairs.

To figure out what does this accumulator mean? we need to join previous SQLMetricInfo got from SparkListenerSQLExecutionStart and possibly upcoming SparkListenerSQLAdaptiveSQLMetricUpdates.

So we need to wait for all of the events SparkListenerSQLExecutionStart and SparkListenerSQLAdaptiveSQLMetricUpdates have been processed, and then we match the accumulator id to get the accumulator name and its associated query plan node.

13. SparkListenerJobStart

Sample json object:

{
"Event": "SparkListenerJobStart",
"Job ID": 0,
"Submission Time": 1617729577252,
"Stage Infos": [
{
"Stage ID": 0,
"Stage Attempt ID": 0,
"Stage Name": "executeColumnar at GpuShuffleCoalesceExec.scala:67",
"Number of Tasks": 16,
"RDD Info": [
{
"RDD ID": 3,
"Name": "MapPartitionsRDD",
"Scope": "{\"id\":\"7\",\"name\":\"GpuColumnarExchange\"}",
"Callsite": "executeColumnar at GpuShuffleCoalesceExec.scala:67",
"Parent IDs": [
2
],
"Storage Level": {
"Use Disk": false,
"Use Memory": false,
"Deserialized": false,
"Replication": 1
},
"Barrier": false,
"Number of Partitions": 16,
"Number of Cached Partitions": 0,
"Memory Size": 0,
"Disk Size": 0
},
...
"Accumulables": [],
"Resource Profile Id": 0
}
],
"Stage IDs": [
0,
1,
2
],
"Properties": {
"spark.rapids.sql.exec.CollectLimitExec": "true",
"spark.executor.resource.gpu.amount": "1",
"spark.rapids.sql.concurrentGpuTasks": "1",
...
}
}

Case class definition:

case class SparkListenerJobStart(
jobId: Int,
time: Long,
stageInfos: Seq[StageInfo],
properties: Properties = null)
extends SparkListenerEvent {
// Note: this is here for backwards-compatibility with older versions of this event which
// only stored stageIds and not StageInfos:
val stageIds: Seq[Int] = stageInfos.map(_.stageId)
}

What is StageInfo?

class StageInfo(
val stageId: Int,
private val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
val taskMetrics: TaskMetrics = null,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
private[spark] val shuffleDepId: Option[Int] = None,
val resourceProfileId: Int) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None

/**
* Terminal values of accumulables updated during this stage, including all the user-defined
* accumulators.
*/
val accumulables = HashMap[Long, AccumulableInfo]()

SparkListenerJobStart has the StageInfo which contains RDD information.

When Job starts, it may also contains modified properties which may override the application level properties got from SparkListenerEnvironmentUpdate. 

It means, in the same application(event log), spark parameters could change so do not assume the parameters are always static inside the same application.

14. SparkListenerStageSubmitted

Sample json object:

{
"Event": "SparkListenerStageSubmitted",
"Stage Info": {
"Stage ID": 1,
"Stage Attempt ID": 0,
"Stage Name": "executeColumnar at GpuShuffleCoalesceExec.scala:67",
"Number of Tasks": 1000,
"RDD Info": [
{
"RDD ID": 8,
"Name": "MapPartitionsRDD",
"Scope": "{\"id\":\"3\",\"name\":\"GpuColumnarExchange\"}",
"Callsite": "executeColumnar at GpuShuffleCoalesceExec.scala:67",
"Parent IDs": [
7
...
"Submission Time": 1617729578789,
"Accumulables": [],
"Resource Profile Id": 0
},
"Properties": {
"spark.rapids.sql.exec.CollectLimitExec": "true",
"spark.executor.resource.gpu.amount": "1",
...

Case class definition:

case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)

Similar as SparkListenerJobStart, the StageInfo is the key content here.

And again, parameters could change here.

15. SparkListenerTaskEnd

Sample json object:

{
"Event": "SparkListenerTaskEnd",
"Stage ID": 1,
"Stage Attempt ID": 0,
"Task Type": "ShuffleMapTask",
"Task End Reason": {
"Reason": "Success"
},
"Task Info": {
"Task ID": 17,
"Index": 1,
"Attempt": 0,
"Launch Time": 1617729578802,
"Executor ID": "0",
"Host": "192.192.192.2",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 0,
"Finish Time": 1617729578977,
"Failed": false,
"Killed": false,
"Accumulables": [
{
"ID": 21,
"Name": "output rows",
"Update": "10",
"Value": "10",
"Internal": true,
"Count Failed Values": true,
"Metadata": "sql"
},
...
},
"Task Executor Metrics": {
"JVMHeapMemory": 0,
"JVMOffHeapMemory": 0,
"OnHeapExecutionMemory": 0,
"OffHeapExecutionMemory": 0,
...
"Task Metrics": {
"Executor Deserialize Time": 73,
"Executor Deserialize CPU Time": 16058445,
"Executor Run Time": 92,
"Executor CPU Time": 59345832,
"Peak Execution Memory": 0,
"Result Size": 5303,
"JVM GC Time": 0,
"Result Serialization Time": 0,
"Memory Bytes Spilled": 0,
"Disk Bytes Spilled": 0,
"Shuffle Read Metrics": {
"Remote Blocks Fetched": 0,
"Local Blocks Fetched": 1,
"Fetch Wait Time": 0,
"Remote Bytes Read": 0,
"Remote Bytes Read To Disk": 0,
"Local Bytes Read": 20652,
"Total Records Read": 1
},
"Shuffle Write Metrics": {
"Shuffle Bytes Written": 86,
"Shuffle Write Time": 2697954,
"Shuffle Records Written": 1
},
"Input Metrics": {
"Bytes Read": 0,
"Records Read": 0
},
"Output Metrics": {
"Bytes Written": 0,
"Records Written": 0
},
"Updated Blocks": []
}
}

Case class definition:

case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
taskExecutorMetrics: ExecutorMetrics,
// may be null if the task has failed
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent

What is TaskMetrics?

class TaskMetrics private[spark] () extends Serializable {
// Each metric is internally represented as an accumulator
private val _executorDeserializeTime = new LongAccumulator
private val _executorDeserializeCpuTime = new LongAccumulator
private val _executorRunTime = new LongAccumulator
private val _executorCpuTime = new LongAccumulator
private val _resultSize = new LongAccumulator
private val _jvmGCTime = new LongAccumulator
private val _resultSerializationTime = new LongAccumulator
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]

SparkListenerTaskEnd may be the most important event if we want to profile the performance based on the event log.

Normally spark performance checking tool is always aggregating this TaskMetrics based on stage, job or SQL level.

In previous events, we can find out the job <-> stage and SQL<-> job mapping, together with the task <-> stage mapping got from this event, we can easily join them together and do aggregation.

Note that here, this event also sends out lots of accumulables.

Now we know how many of the events are sending and dealing with accumulables.

16. SparkListenerStageCompleted

Sample json object:

{
"Event": "SparkListenerStageCompleted",
"Stage Info": {
"Stage ID": 0,
"Stage Attempt ID": 0,
"Stage Name": "executeColumnar at GpuShuffleCoalesceExec.scala:67",
"Number of Tasks": 16,
"RDD Info": [
{
"RDD ID": 3,
"Name": "MapPartitionsRDD",
"Scope": "{\"id\":\"7\",\"name\":\"GpuColumnarExchange\"}",
"Callsite": "executeColumnar at GpuShuffleCoalesceExec.scala:67",
"Parent IDs": [
2
],
...
"Submission Time": 1617729577270,
"Completion Time": 1617729578759,
"Accumulables": [
{
"ID": 47,
"Name": "output rows",
"Value": "2000000",
"Internal": true,
"Count Failed Values": true,
"Metadata": "sql"
},
],
"Resource Profile Id": 0
}
}

Case class definition:

case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

Again : StageInfo is the key content, and again, accumulables inside StageInfo.

17. SparkListenerJobEnd

Sample json object:

{
"Event": "SparkListenerJobEnd",
"Job ID": 0,
"Completion Time": 1617729581438,
"Job Result": {
"Result": "JobSucceeded"
}
}

Case class definition:

case class SparkListenerJobEnd(
jobId: Int,
time: Long,
jobResult: JobResult)
extends SparkListenerEvent

SparkListenerJobEnd shows the job end timestamp which can be calculated to job duration.

Here JobResult is a trait which can be used to fetch job status when finishing.

18. SparkListenerTaskGettingResult

Sample json object:

{
"Event": "SparkListenerTaskGettingResult",
"Task Info": {
"Task ID": 1024,
"Index": 7,
"Attempt": 0,
"Launch Time": 1617729607875,
"Executor ID": "0",
"Host": "111.111.111.111",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 1617729608076,
"Finish Time": 0,
"Failed": false,
"Killed": false,
"Accumulables": []
}

Case class definition:

case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

SparkListenerTaskGettingResult can show the getting result time for specific task.

19. SparkListenerApplicationEnd

Sample json object:

{
"Event": "SparkListenerApplicationEnd",
"Timestamp": 1617729611879
}

Case class definition:

case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

SparkListenerApplicationEnd only let us know the end timestamp for the application.

20. SparkListenerSQLAdaptiveExecutionUpdate

Sample json object:

{
"Event": "org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate",
"executionId": 11,
"physicalPlanDescription": "== Parsed Logical Plan ==...
"sparkPlanInfo": {
"nodeName": "GpuColumnarToRow",
"simpleString": "GpuColumnarToRow false",
"children": [
{

Case class definition:

case class SparkListenerSQLAdaptiveExecutionUpdate(
executionId: Long,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo)
extends SparkListenerEvent

SparkListenerSQLAdaptiveExecutionUpdate can be triggered when AQE is on, and it will override the query plan got from previous event SparkListenerSQLExecutionStart.

So if AQE is turned on(or in the future Spark 3.2 may turn on AQE by default), make sure wait for processing SparkListenerSQLAdaptiveExecutionUpdate before processing the query plan.

This can impact accumulables because accumulables are defined inside SparkPlanInfo.

So the best way is to wait for all AQE related events arrived, and then deduplicate on the SparkPlanInfo collected before starting to calculate any accumulables.

21. SparkListenerSQLAdaptiveSQLMetricUpdates

Sample json object:

{
"Event": "org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveSQLMetricUpdates",
"executionId": 11,
"sqlPlanMetrics": [
{
"name": "shuffle records written",
"accumulatorId": 1111,
"metricType": "sum"
},
{
"name": "shuffle write time",
"accumulatorId": 2222,
"metricType": "nsTiming"
},

Case class definition:

 case class SparkListenerSQLAdaptiveSQLMetricUpdates(
executionId: Long,
sqlPlanMetrics: Seq[SQLPlanMetric])
extends SparkListenerEvent

Again,  accumulables. This event will udpate/add accumulables from SQLPlanMetric.

 

In all, there are so many different kinds of events in Spark event log, and there could be more I believe.

We need to look into Spark source code to understand how they work together to define the performance metrics for application, SQL, Job, Stage, Task levels.

Especially for accumulables, there are more than 6 types of events dealing with it:

  • Define accumulables types: SparkListenerSQLExecutionStart, SparkListenerSQLAdaptiveExecutionUpdate
  • Send accumuables values: SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerDriverAccumUpdates, SparkListenerSQLAdaptiveSQLMetricUpdates

For example, to calculate the max value for a accumulator, you may need to scan through all of above events to get the the real max value.

 

 

No comments:

Post a Comment

Popular Posts