Friday, June 8, 2018

How to control the parallelism of Spark job

Goal:

This article explains how to control the parallelism of Spark job and uses a Spark on YARN job to demonstrate.

Env:

Spark 2.1.0
MapR 5.2

Solution:

  1. Spark job's parallelism is decided by the number of executors(YARN containers for Spark on YARN job.)

    From previous article -- Resource allocation configurations for Spark on Yarn, it is controlled by options spark.executor.instances (--num-executors).
  2. Each executor's parallelism is decided by the number of tasks which can run concurrently.

    Inside each executor, assume N number of tasks are assigned.
    However only X number of tasks can run concurrently. Here X means the parallelism of the threads inside each executor.

    Total number of CPU cores for each executor(YARN container for Spark on YARN) are determined by spark.executor.cores(--executor-cores).
    The number of CPU cores needed by each task is determined by spark.task.cpus(default=1).
    So X = spark.executor.cores/spark.task.cpus

Demo

Let's run a Spark on YARN example job with 10 tasks needed:
/opt/mapr/spark/spark-2.1.0/bin/run-example --master yarn --deploy-mode client SparkPi 10

1. Default: spark.executor.cores=1, spark.task.cpus=1

ResourceManager allocates 2 executors(YARN containers):
2018-06-08 12:47:41,369 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0010_01_000001 of capacity <memory:1024, vCores:1, disks:0.0> on host s2.poc.com:27338, which has 1 containers, <memory:1024, vCores:1, disks:0.0> used and <memory:4096, vCores:1, disks:1.33> available after allocation
2018-06-08 12:47:48,201 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0010_01_000002 of capacity <memory:3072, vCores:1, disks:0.0> on host s4.poc.com:53027, which has 1 containers, <memory:3072, vCores:1, disks:0.0> used and <memory:10849, vCores:1, disks:1.33> available after allocation
2018-06-08 12:47:48,202 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0010_01_000003 of capacity <memory:3072, vCores:1, disks:0.0> on host s3.poc.com:11447, which has 1 containers, <memory:3072, vCores:1, disks:0.0> used and <memory:2048, vCores:1, disks:1.33> available after allocation
From Spark UI, inside each executor, 5 tasks got assigned: 5 in s4, and 5 in s3.
Now let's look at the YARN container log for s4:
[root@s4 container_e07_1511207998241_0010_01_000002]# egrep "Running task|Finished task" stderr
2018-06-08 12:47:55,273 INFO  [Executor task launch worker-0] executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
2018-06-08 12:47:55,712 INFO  [Executor task launch worker-0] executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1856 bytes result sent to driver
2018-06-08 12:47:55,723 INFO  [Executor task launch worker-0] executor.Executor: Running task 3.0 in stage 0.0 (TID 3)
2018-06-08 12:47:55,750 INFO  [Executor task launch worker-0] executor.Executor: Finished task 3.0 in stage 0.0 (TID 3). 1128 bytes result sent to driver
2018-06-08 12:47:55,759 INFO  [Executor task launch worker-0] executor.Executor: Running task 4.0 in stage 0.0 (TID 4)
2018-06-08 12:47:55,780 INFO  [Executor task launch worker-0] executor.Executor: Finished task 4.0 in stage 0.0 (TID 4). 1041 bytes result sent to driver
2018-06-08 12:47:55,792 INFO  [Executor task launch worker-0] executor.Executor: Running task 6.0 in stage 0.0 (TID 6)
2018-06-08 12:47:55,812 INFO  [Executor task launch worker-0] executor.Executor: Finished task 6.0 in stage 0.0 (TID 6). 1128 bytes result sent to driver
2018-06-08 12:47:55,821 INFO  [Executor task launch worker-0] executor.Executor: Running task 8.0 in stage 0.0 (TID 8)
2018-06-08 12:47:55,839 INFO  [Executor task launch worker-0] executor.Executor: Finished task 8.0 in stage 0.0 (TID 8). 1041 bytes result sent to driver
From the timestamp above, inside each executor(YARN container), only 1 task was running concurrently.

2. spark.executor.cores=2, spark.task.cpus=1

Again, ResourceManager allocates 2 executors(YARN container), however this time each executor has 2 cores.
2018-06-08 13:28:36,122 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0012_01_000001 of capacity <memory:1024, vCores:1, disks:0.0> on host s3.poc.com:11447, which has 1 containers, <memory:1024, vCores:1, disks:0.0> used and <memory:4096, vCores:1, disks:1.33> available after allocation
2018-06-08 13:28:42,288 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0012_01_000003 of capacity <memory:2048, vCores:2, disks:0.0> on host s2.poc.com:27338, which has 1 containers, <memory:2048, vCores:2, disks:0.0> used and <memory:3072, vCores:0, disks:1.33> available after allocation
2018-06-08 13:28:42,683 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_e07_1511207998241_0012_01_000004 of capacity <memory:2048, vCores:2, disks:0.0> on host s1.poc.com:39579, which has 1 containers, <memory:2048, vCores:2, disks:0.0> used and <memory:3072, vCores:0, disks:1.33> available after allocation
From Spark UI, inside executor on s1, 7 tasks got assigned; on s2, 3 tasks got assigned:
YARN container log for s1:
# egrep "Running task|Finished task" stderr
2018-06-08 13:28:49,451 INFO  [Executor task launch worker-0] executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
2018-06-08 13:28:49,451 INFO  [Executor task launch worker-1] executor.Executor: Running task 3.0 in stage 0.0 (TID 3)
2018-06-08 13:28:49,927 INFO  [Executor task launch worker-1] executor.Executor: Finished task 3.0 in stage 0.0 (TID 3). 1041 bytes result sent to driver
2018-06-08 13:28:49,929 INFO  [Executor task launch worker-0] executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1856 bytes result sent to driver
2018-06-08 13:28:49,940 INFO  [Executor task launch worker-0] executor.Executor: Running task 4.0 in stage 0.0 (TID 4)
2018-06-08 13:28:49,950 INFO  [Executor task launch worker-1] executor.Executor: Running task 5.0 in stage 0.0 (TID 5)
2018-06-08 13:28:49,963 INFO  [Executor task launch worker-0] executor.Executor: Finished task 4.0 in stage 0.0 (TID 4). 1041 bytes result sent to driver
2018-06-08 13:28:49,970 INFO  [Executor task launch worker-0] executor.Executor: Running task 6.0 in stage 0.0 (TID 6)
2018-06-08 13:28:50,021 INFO  [Executor task launch worker-1] executor.Executor: Finished task 5.0 in stage 0.0 (TID 5). 1041 bytes result sent to driver
2018-06-08 13:28:50,023 INFO  [Executor task launch worker-0] executor.Executor: Finished task 6.0 in stage 0.0 (TID 6). 1041 bytes result sent to driver
2018-06-08 13:28:50,033 INFO  [Executor task launch worker-1] executor.Executor: Running task 8.0 in stage 0.0 (TID 8)
2018-06-08 13:28:50,036 INFO  [Executor task launch worker-0] executor.Executor: Running task 7.0 in stage 0.0 (TID 7)
2018-06-08 13:28:50,061 INFO  [Executor task launch worker-0] executor.Executor: Finished task 7.0 in stage 0.0 (TID 7). 1041 bytes result sent to driver
2018-06-08 13:28:50,078 INFO  [Executor task launch worker-1] executor.Executor: Finished task 8.0 in stage 0.0 (TID 8). 1041 bytes result sent to driver
YARN container log for s2:
# egrep "Running task|Finished task" stderr
2018-06-08 13:28:49,450 INFO  [Executor task launch worker-1] executor.Executor: Running task 2.0 in stage 0.0 (TID 2)
2018-06-08 13:28:49,450 INFO  [Executor task launch worker-0] executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
2018-06-08 13:28:50,055 INFO  [Executor task launch worker-1] executor.Executor: Finished task 2.0 in stage 0.0 (TID 2). 1041 bytes result sent to driver
2018-06-08 13:28:50,056 INFO  [Executor task launch worker-0] executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1856 bytes result sent to driver
2018-06-08 13:28:50,071 INFO  [Executor task launch worker-0] executor.Executor: Running task 9.0 in stage 0.0 (TID 9)
2018-06-08 13:28:50,113 INFO  [Executor task launch worker-0] executor.Executor: Finished task 9.0 in stage 0.0 (TID 9). 1041 bytes result sent to driver
From the timestamp above, inside each executor(YARN container), 2 task was running concurrently.
In all, above test results confirm the theory that the parallelism of threads/tasks inside each executor: X = spark.executor.cores/spark.task.cpus


No comments:

Post a Comment