Thursday, April 29, 2021

How to use Spark Operator to run Spark job with Rapids Accelerator

Goal:

This article shares the steps on how to run Spark job with Rapids Accelerator using Spark Operator in a Kubernetes Cluster.

Env:

Spark 3.1.1

Rapids Accelerator 0.4.1 with cuDF 0.18.1

Kubernetes Cluster 1.19

Spark Operator

Solution:

As per SPARK-33005, Spark on Kubernetes is GA in Spark 3.1.1.

In the Rapids Accelerator official Doc: Getting Started with RAPIDS and Kubernetes, it shares the steps on how to use spark-submit/spark-shell to directly submit Spark jobs into a Kubernetes Cluster.

This article will mainly focus on how to use Spark Operator to do the same thing.

Here we assume you already have a working Kubernetes Cluster with NVIDIA GPU support, and also built your own Spark docker image by following the above Getting Started with RAPIDS and Kubernetes

1. Copy your application into the docker image

When following above Getting Started with RAPIDS and Kubernetes, make sure you modify the Dockerfile to copy your application(such as jars, python files) into the docker image. 

This is because, as of today, as per the Spark Operator user guide : "A SparkApplication should set .spec.deployMode to cluster, as client is not currently implemented. The driver pod will then run spark-submit in client mode internally to run the driver program. "

Here we created a below test.py and copy it into docker image under directory "/opt/sparkRapidsPlugin":

from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df=sqlContext.createDataFrame([1,2,3], "int").toDF("value")
df.createOrReplaceTempView("df")
sqlContext.sql("SELECT * FROM df WHERE value<>1").explain()
sqlContext.sql("SELECT * FROM df WHERE value<>1").show()
sc.stop()

Modify Dockerfile to add below:

COPY test.py /opt/sparkRapidsPlugin

2. Create spark-operator in a namespace named "spark-operator" using helm chart.

Here we just follow the Spark Operator quick start guide.

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace

In the end, if you want to delete this chart, use below command:

helm uninstall my-release --namespace spark-operator

3. Check what objects are created in Kubernetes Cluster

$ kubectl get pods -n spark-operator
NAME READY STATUS RESTARTS AGE
my-release-spark-operator-599f575d4-cjlmz 1/1 Running 0 62s

$ kubectl get deployment -n spark-operator
NAME READY UP-TO-DATE AVAILABLE AGE
my-release-spark-operator 1/1 1 1 101s

$ kubectl get clusterrolebinding |grep spark-operator
my-release-spark-operator ClusterRole/my-release-spark-operator 5m28s

$ kubectl describe clusterrolebinding my-release-spark-operator
Name: my-release-spark-operator
Labels: app.kubernetes.io/instance=my-release
app.kubernetes.io/managed-by=Helm
app.kubernetes.io/name=spark-operator
app.kubernetes.io/version=v1beta2-1.2.3-3.1.1
helm.sh/chart=spark-operator-1.1.0
Annotations: meta.helm.sh/release-name: my-release
meta.helm.sh/release-namespace: spark-operator
Role:
Kind: ClusterRole
Name: my-release-spark-operator
Subjects:
Kind Name Namespace
---- ---- ---------
ServiceAccount my-release-spark-operator spark-operator


$ kubectl get role -n spark-operator
NAME CREATED AT
spark-role 2021-04-29T16:16:32Z

4. Check the status of spark-operator

$ helm status --namespace spark-operator my-release
NAME: my-release
LAST DEPLOYED: Thu Apr 29 09:20:14 2021
NAMESPACE: spark-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None

5. Run a Spark Pi job without using Rapids Accelerator

This is just to make sure Spark Operator itself is working fine without adding complexity of troubleshooting.

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator
kubectl apply -f examples/spark-pi.yaml

Note: Driver Pod will use "spark" service account by default. So make sure you either have granted enough privileges to "spark" or modify the yaml file as whatever you need.

It should completed successfully:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-pi-driver 0/1 Completed 0 48s

You can also check the status of sparkapplications (custom resource definition aka CRD) using kubectl:

$ kubectl get sparkapplications spark-pi -o=yaml
...
status:
applicationState:
state: COMPLETED
...

Or describe it to get the events: 

$ kubectl describe sparkapplication spark-pi
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SparkApplicationAdded 7m22s spark-operator SparkApplication spark-pi was added, enqueuing it for submission
Normal SparkApplicationSubmitted 7m20s spark-operator SparkApplication spark-pi was submitted successfully
Normal SparkDriverRunning 7m9s spark-operator Driver spark-pi-driver is running
Normal SparkExecutorPending 7m4s spark-operator Executor spark-pi-d25689791e785e41-exec-1 is pending
Normal SparkExecutorRunning 7m1s spark-operator Executor spark-pi-d25689791e785e41-exec-1 is running
Normal SparkExecutorCompleted 6m58s (x2 over 6m58s) spark-operator Executor spark-pi-d25689791e785e41-exec-1 completed
Normal SparkDriverCompleted 6m58s (x2 over 6m58s) spark-operator Driver spark-pi-driver completed
Normal SparkApplicationCompleted 6m58s spark-operator SparkApplication spark-pi completed
...

6. Build sparkctl

sparkctl has more functionality to support Spark on K8s. It is shipped inside the downloaded Spark Operator repo.

Let's build it and use it instead of kubectl.

6.1 Install Golang

Follow https://golang.org/doc/install to install Golang on Mac.

After that, set the PATH in .bash_profile:

export PATH=$PATH:/usr/local/go/bin

6.2 Build sparkctl

cd sparkctl
go build -o sparkctl

After that, set PATH for this sparkctl as well.

7. Run a Spark job with Rapids Accelerator

7.1 Create a yaml file named testpython-rapids.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: testpython-rapids
namespace: default
spec:
sparkConf:
"spark.ui.port": "4045"
"spark.rapids.sql.concurrentGpuTasks": "1"
"spark.executor.resource.gpu.amount": "1"
"spark.task.resource.gpu.amount": "1"
"spark.executor.memory": "1g"
"spark.rapids.memory.pinnedPool.size": "2g"
"spark.executor.memoryOverhead": "3g"
"spark.locality.wait": "0s"
"spark.sql.files.maxPartitionBytes": "512m"
"spark.sql.shuffle.partitions": "10"
"spark.plugins": "com.nvidia.spark.SQLPlugin"
"spark.executor.resource.gpu.discoveryScript": "/opt/sparkRapidsPlugin/getGpusResources.sh"
"spark.executor.resource.gpu.vendor": "nvidia.com"
"spark.executor.extraClassPath": "/opt/sparkRapidsPlugin/rapids-4-spark.jar:/opt/sparkRapidsPlugin/cudf.jar"
"spark.driver.extraClassPath": "/opt/sparkRapidsPlugin/rapids-4-spark.jar:/opt/sparkRapidsPlugin/cudf.jar"
type: Python
pythonVersion: 3
mode: cluster
image: "<image>"
imagePullPolicy: Always
mainApplicationFile: "local:///opt/sparkRapidsPlugin/test.py"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "1024m"
labels:
version: 3.1.1
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "5000m"
gpu:
name: "nvidia.com/gpu"
quantity: 1
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"

7.2 Submit testpython-rapids

sparkctl create testpython-rapids.yaml

7.3 Check status of testpython-rapids

sparkctl status testpython-rapids

7.4 Check driver log

sparkctl log testpython-rapids

It should show GPU related query plan and the job results.

== Physical Plan ==
GpuColumnarToRow false
+- GpuFilter (gpuisnotnull(value#0) AND NOT (value#0 = 1))
+- GpuRowToColumnar TargetSize(2147483647)
+- *(1) Scan ExistingRDD[value#0]

7.5 Check executor log (when it is running)

sparkctl log testpython-rapids -e 1

7.6 Check the events

sparkctl event testpython-rapids

7.7 port forwarding (when driver is running)

sparkctl forward testpython-rapids --local-port 1234 --remote-port 4045

Then open localhost:1234 in browser. 

Note: here the remote port 4045 is what we set for "spark.ui.port" in the testpython-rapids.yaml.

7.8 Delete the spark job

sparkctl delete testpython-rapids


Reference:



No comments:

Post a Comment

Popular Posts