Thursday, March 25, 2021

concat_ws example on Spark with RAPIDS Accelerator

Goal:

This is a quick example of operator contact_ws on Spark with RAPIDS Accelerator.

Env:

Spark 3.1.1

RAPIDS Accelerator for Apache Spark 0.4.1

Solution:

1. concat_ws can convert an Array of Strings to a String with a separator. 

Below is a quick example using scala:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, ArrayType}

val data = Seq(
Row(1, List("orange", "banana", "apple")),
Row(2, List("a", "b", "c"))
)

val schema = StructType(Array(
StructField("idx",IntegerType,true),
StructField("arrays",ArrayType(StringType),true)
))

val df = spark.createDataFrame( spark.sparkContext.parallelize(data),schema )
val df2 = df.withColumn("concat_array", concat_ws(",",col("arrays")))
df2.show()
df2.explain()

The output with RAPIDS Accelerator for Apache Spark 0.4.1 is :

scala> df2.show

+---+--------------------+-------------------+
|idx| arrays| concat_array|
+---+--------------------+-------------------+
| 1|[orange, banana, ...|orange,banana,apple|
| 2| [a, b, c]| a,b,c|
+---+--------------------+-------------------+


scala> df2.explain()
21/03/25 21:01:48 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <AttributeReference> idx#2 could run on GPU
@Expression <AttributeReference> arrays#3 could run on GPU
@Expression <Alias> concat_ws(,, arrays#3) AS concat_array#15 could run on GPU
!NOT_FOUND <ConcatWs> concat_ws(,, arrays#3) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.ConcatWs could be found
@Expression <Literal> , could run on GPU
@Expression <AttributeReference> arrays#3 could run on GPU
!NOT_FOUND <RDDScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RDDScanExec could be found
@Expression <AttributeReference> idx#2 could run on GPU
@Expression <AttributeReference> arrays#3 could run on GPU

== Physical Plan ==
*(1) Project [idx#2, arrays#3, concat_ws(,, arrays#3) AS concat_array#15]
+- *(1) Scan ExistingRDD[idx#2,arrays#3]

As you can see, concat_ws is not supported on RAPIDS Accelerator 0.4.1 since it falls back to CPU.

2. concat_ws can concatenate multiple columns together with a separator.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val data = Seq(
Row(1, "orange", "banana", "apple"),
Row(2, "a", "b", "c")
)

val schema = StructType(Array(
StructField("idx",IntegerType,true),
StructField("s1",StringType,true),
StructField("s2",StringType,true),
StructField("s3",StringType,true)
))

val df = spark.createDataFrame( spark.sparkContext.parallelize(data),schema )
val df2 = df.withColumn("concat_array", concat_ws(",",col("idx"), col("s1"), col("s2"), col("s3") ))
df2.show()
df2.explain()

The output with RAPIDS Accelerator for Apache Spark 0.4.1 is :

scala> df2.show

+---+------+------+-----+--------------------+
|idx| s1| s2| s3| concat_array|
+---+------+------+-----+--------------------+
| 1|orange|banana|apple|1,orange,banana,a...|
| 2| a| b| c| 2,a,b,c|
+---+------+------+-----+--------------------+


scala> df2.explain()
21/03/25 21:19:11 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <AttributeReference> idx#65 could run on GPU
@Expression <AttributeReference> s1#66 could run on GPU
@Expression <AttributeReference> s2#67 could run on GPU
@Expression <AttributeReference> s3#68 could run on GPU
@Expression <Alias> concat_ws(,, cast(idx#65 as string), s1#66, s2#67, s3#68) AS concat_array#73 could run on GPU
!NOT_FOUND <ConcatWs> concat_ws(,, cast(idx#65 as string), s1#66, s2#67, s3#68) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.ConcatWs could be found
@Expression <Literal> , could run on GPU
@Expression <Cast> cast(idx#65 as string) could run on GPU
@Expression <AttributeReference> idx#65 could run on GPU
@Expression <AttributeReference> s1#66 could run on GPU
@Expression <AttributeReference> s2#67 could run on GPU
@Expression <AttributeReference> s3#68 could run on GPU
!NOT_FOUND <RDDScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RDDScanExec could be found
@Expression <AttributeReference> idx#65 could run on GPU
@Expression <AttributeReference> s1#66 could run on GPU
@Expression <AttributeReference> s2#67 could run on GPU
@Expression <AttributeReference> s3#68 could run on GPU

== Physical Plan ==
*(1) Project [idx#65, s1#66, s2#67, s3#68, concat_ws(,, cast(idx#65 as string), s1#66, s2#67, s3#68) AS concat_array#73]
+- *(1) Scan ExistingRDD[idx#65,s1#66,s2#67,s3#68]

Same here concat_ws is not supported on RAPIDS Accelerator 0.4.1 since it falls back to CPU.

Let's compare this scenario to a concat operator:

val df3 = df.withColumn("concat_array", concat(col("idx"), lit(','), col("s1"), lit(','), col("s2"), lit(','), col("s3") ))
df3.show()
df3.explain()

Output for concat with RAPIDS Accelerator for Apache Spark 0.4.1 is :

scala> df3.show()

+---+------+------+-----+--------------------+
|idx| s1| s2| s3| concat_array|
+---+------+------+-----+--------------------+
| 1|orange|banana|apple|1,orange,banana,a...|
| 2| a| b| c| 2,a,b,c|
+---+------+------+-----+--------------------+


scala> df3.explain()
21/03/25 21:26:28 WARN GpuOverrides:
!NOT_FOUND <RDDScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RDDScanExec could be found
@Expression <AttributeReference> idx#65 could run on GPU
@Expression <AttributeReference> s1#66 could run on GPU
@Expression <AttributeReference> s2#67 could run on GPU
@Expression <AttributeReference> s3#68 could run on GPU

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [idx#65, s1#66, s2#67, s3#68, gpuconcat(cast(idx#65 as string), ,, s1#66, ,, s2#67, ,, s3#68) AS concat_array#100]
+- GpuRowToColumnar TargetSize(2147483647)
+- *(1) Scan ExistingRDD[idx#65,s1#66,s2#67,s3#68]

Since concat is a supported operator, as you can see above, it is running on GPU using "GpuProject".

In this scenario, if you want, you can use concat to rewrite conact_ws to make it run on GPU in RAPIDS Accelerator 0.4.1 version.

Note: above tests are based on RAPIDS Accelerator 0.4.1. Future versions should have more supported operators.

For supported operators in RAPIDS Accelerator, please always refer to this RAPIDS Accelerator Doc.


 

No comments:

Post a Comment

Popular Posts