Wednesday, March 24, 2021

Hands-on native cuDF Pandas UDF

Goal:

This article will help show some hands-on steps to play with native cuDF Pandas UDF on Spark with RAPIDS Accelerator for Apache Spark.

Env:

RAPIDS Accelerator for Apache Spark 0.4.1

Spark 3.1.1

RTX 6000 GPU

Concept:

As we know, Spark introduced Pandas UDFs (a.k.a. Vectorized UDFs) feature in the Spark 2.3 and brings huge performance gains.

Here we will introduce the native cuDF version Pandas UDF(which can run on GPU natively) with RAPIDS Accelerator for Apache Spark enabled for Spark.

Below parameters controls this behavior:

  • spark.rapids.python.concurrentPythonWorkers : Number of Python worker processes that can execute concurrently per GPU.
  • spark.rapids.python.memory.gpu.allocFraction : The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers.
  • spark.rapids.python.memory.gpu.maxAllocFraction : The fraction of total GPU memory that limits the maximum size of the RMM pool for all the Python workers.
  • spark.rapids.python.memory.gpu.pooling.enabled : Should RMM in Python workers act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly.

If we enable this feature, Python worker processes will share and allocate the GPU memory with Spark Executors. Please read this post for more details on GPU pool memory allocation for Spark+RAPIDS.

As a result, we need to divide the GPU memory between Spark Executors and Python worker process.

Here I am allocating 40% GPU memory for Python workers and 50% for Spark Executors by setting below in spark-defaults.conf:

spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.gpu.allocFraction 0.5
spark.rapids.python.memory.gpu.allocFraction 0.4
spark.rapids.python.memory.gpu.maxAllocFraction 0.4

And then I decide to spin off 2 concurrent Python workers:

spark.rapids.python.concurrentPythonWorkers 2

Since RTX 6000 has 24G  GPU memory, after that, when Python workers are running, you may see below DEBUG log from Executor log:

DEBUG: Pooled memory, pool size: 4844.0625 MiB, max size: 8796093022208.0 MiB

This means, 24G * 0.4 / 2 = 4.8G.

Note: Since the default spark.rapids.memory.gpu.allocFraction=0.9, if we did not do the memory allocation properly, you may hit below error in some tasks' logs:

MemoryError: std::bad_alloc: RMM failure at:/home/xxx/xxx/envs/rapids-0.18/include/rmm/mr/device/pool_memory_resource.hpp:188: Maximum pool size exceeded

Solution:

1. Python dependency is cuDF

Make sure you installed cuDF library in your python env.

You can follow this rapids.ai started guide to install the libraries in your conda env on all nodes.

For example:

conda create -n rapids-0.18 -c rapidsai -c nvidia -c conda-forge \
-c defaults cudf=0.18 python=3.8 cudatoolkit=11.0

Note: If you can not install cuDF library on all nodes due to some reason, then you may need to package the whole conda env and distribute it to all Spark Executors which could be very time consuming. For example, in this post I used this way to do run the test framework.

After that, make sure the python for pyspark is pointing to the correct conda env by setting PYSPARK_PYTHON in spark-env.sh on all nodes:

export PYSPARK_PYTHON=/xxx/xxx/MYGLOBALENV/rapids-0.18/bin/python

2. RAPIDS Accelerator for Apache Spark is setup properly

I am assuming you have set RAPIDS Accelerator for Apache Spark related parameters properly and RAPIDS Accelerator for Apache Spark is working fine already.

Especially, the spark.driver.extraJavaOptions, spark.executor.extraJavaOptions should use UTC JVM timezone as per this post

spark.executor.extraClassPath and spark.driver.extraClassPath should include the cudf jar and rapids-4-spark jar.

3. Launch pyspark and test different kinds of UDFs

pyspark --conf spark.executorEnv.PYTHONPATH="/home/xxx/spark/rapids/rapids-4-spark_2.12-0.4.1.jar" 

Here make sure you specify the correct jar path for rapids-4-spark jar.

Import needed python libs and create a sample dataframe:

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
import cudf
import pandas as pd

# Prepare sample data
small_data = [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)]
df = spark.createDataFrame(small_data, ("id", "v"))

3.a row-at-a-time UDF

# Use udf to define a row-at-a-time udf
@udf('double')
# Input/output are both a single double value
def plus_one(v):
return v + 1

df.withColumn('v2', plus_one(df.v)).show()
df.withColumn('v2', plus_one(df.v)).explain()

Output:

21/03/24 17:58:42 WARN GpuOverrides:
!NOT_FOUND <BatchEvalPythonExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.python.BatchEvalPythonExec could be found
@Expression <PythonUDF> plus_one(v#1) could not block GPU acceleration
@Expression <AttributeReference> v#1 could run on GPU
@Expression <AttributeReference> pythonUDF0#28 could run on GPU
!NOT_FOUND <RDDScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RDDScanExec could be found
@Expression <AttributeReference> id#0L could run on GPU
@Expression <AttributeReference> v#1 could run on GPU

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [id#0L, v#1, pythonUDF0#28 AS v2#24]
+- GpuRowToColumnar TargetSize(2147483647)
+- BatchEvalPython [plus_one(v#1)], [pythonUDF0#28]
+- *(1) Scan ExistingRDD[id#0L,v#1]

As we can see, "BatchEvalPython" is not running on GPU.

3.b Pandas UDF

To test the query plan or performance, we need to disable above cuDF Pandas UDF related parameters such as spark.rapids.sql.python.gpu.enabled.
# Use pandas_udf to define a Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles
def pandas_plus_one(v: pd.Series) -> pd.Series:
return v + 1

df.withColumn('v2', pandas_plus_one(df.v)).show()
df.withColumn('v2', pandas_plus_one(df.v)).explain()

Output:

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [id#102L, v#103, pythonUDF0#136 AS v2#132]
+- GpuCoalesceBatches TargetSize(2147483647)
+- GpuArrowEvalPython [pandas_plus_one(v#103)], [pythonUDF0#136], 200
+- GpuRowToColumnar TargetSize(2147483647)
+- *(1) Scan ExistingRDD[id#102L,v#103]

As we can see, it is done by GpuArrowEvalPython.

From Spark Executor log, "PythonUDFRunner" is started to do the work.

When the job is running, the python daemon processes are "pyspark.daemon":

python -m pyspark.daemon
...
python -m pyspark.daemon

3.c cuDF Pandas UDF

@pandas_udf('double')
def cudf_pandas_plus_one(v: pd.Series) -> pd.Series:
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()

df.withColumn('v2', cudf_pandas_plus_one(df.v)).show()
df.withColumn('v2', cudf_pandas_plus_one(df.v)).explain()

Output:

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [id#0L, v#1, pythonUDF0#74 AS v2#70]
+- GpuCoalesceBatches TargetSize(2147483647)
+- GpuArrowEvalPython [cudf_pandas_plus_one(v#1)], [pythonUDF0#74], 200
+- GpuRowToColumnar TargetSize(2147483647)
+- *(1) Scan ExistingRDD[id#0L,v#1]

As we can see, it is done by GpuArrowEvalPython. The same plan as above 3.b.

From Spark Executor log, "GpuArrowPythonRunner" is started to do the work.

When the job is running, the python daemon processes are "rapids.daemon":

python -m rapids.daemon
...
python -m rapids.daemon

For more types of native cuDF pandas UDF, please refer to this test python code.

References:

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html


No comments:

Post a Comment

Popular Posts