Thursday, March 18, 2021

What Dataset API is not supported for RAPIDS Accelerator for Apache Spark

Goal:

This article explains what Dataset API is not supported for RAPIDS Accelerator for Apache Spark.

Env:

Spark 3.0.2
RAPIDS Accelerator for Apache Spark 0.3

Solution:

Currently RAPIDS Accelerator for Apache Spark does not support Dataset API but does support Dataframe API.
As we know, basically Dataframe is Dataset[ROW], then what does it mean? 
In general the difference is that Dataset API can provide type-safety at compile time and also typed JVM objects comparing to Dataframe API.
If you are leveraging Dataset API's compile time error check feature, the operator may not be able to run on GPU.
Here is one easy example in spark-shell using scala:

1. Create a sample Dataset

import org.apache.spark.sql.Dataset
case class customer (
c_customer_sk: Int,
c_customer_id: String,
c_current_cdemo_sk: Int,
c_current_hdemo_sk: Int,
c_current_addr_sk: Int
)

val df=spark.sql("select c_customer_sk,c_customer_id,c_current_cdemo_sk,c_current_hdemo_sk,c_current_addr_sk from tpcds.customer limit 10")
val ds: Dataset[customer] = df.as[customer]

2. Working on GPU

scala> ds.filter($"c_customer_sk" > 0).explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuFilter (gpuisnotnull(c_customer_sk#0) AND (c_customer_sk#0 > 0))
+- GpuGlobalLimit 10
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpusinglepartitioning(), ENSURE_REQUIREMENTS, [id=#244]
+- GpuLocalLimit 10
+- GpuFileGpuScan parquet tpcds.customer[c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/tpcds_100G_parquet/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
Here we specify the exact column name and as you can see Filter is running on GPU.

3. Not working on GPU

scala> ds.filter(_.c_customer_sk > 0).explain
== Physical Plan ==
*(1) Filter $line23.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$3763/0x00000008415be040@75f92fdc.apply
+- GpuColumnarToRow false
+- GpuGlobalLimit 10
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpusinglepartitioning(), ENSURE_REQUIREMENTS, [id=#201]
+- GpuLocalLimit 10
+- GpuFileGpuScan parquet tpcds.customer[c_customer_sk#0,c_customer_id#1,c_current_cdemo_sk#2,c_current_hdemo_sk#3,c_current_addr_sk#4] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/xxx/data/tpcds_100G_parquet/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
Here we are trying to access the column inside the typed JVM object at compile time, so the Filter can not run on GPU. 
Above Filter is actually an opaque Lamda function in Catalyst plan.
But other operators like FileScan is running on GPU.

If we set spark.rapids.sql.explain=NOT_ON_GPU we can see the reasons:
!Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
!NOT_FOUND <Invoke> $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$3146/0x000000084137e840@f053608.apply cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.Invoke could be found
!Expression <Literal> $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$3146/0x000000084137e840@f053608 cannot run on GPU because expression Literal $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$3146/0x000000084137e840@f053608 produces an unsupported type ObjectType(interface scala.Function1)
!NOT_FOUND <NewInstance> newInstance(class $line15.$read$$iw$$iw$customer) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.NewInstance could be found
!NOT_FOUND <AssertNotNull> assertnotnull(c_customer_sk#0) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull could be found
@Expression <AttributeReference> c_customer_sk#0 could run on GPU
!NOT_FOUND <Invoke> c_customer_id#1.toString cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.Invoke could be found
@Expression <AttributeReference> c_customer_id#1 could run on GPU
!NOT_FOUND <AssertNotNull> assertnotnull(c_current_cdemo_sk#2) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull could be found
@Expression <AttributeReference> c_current_cdemo_sk#2 could run on GPU
!NOT_FOUND <AssertNotNull> assertnotnull(c_current_hdemo_sk#3) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull could be found
@Expression <AttributeReference> c_current_hdemo_sk#3 could run on GPU
!NOT_FOUND <AssertNotNull> assertnotnull(c_current_addr_sk#4) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull could be found
@Expression <AttributeReference> c_current_addr_sk#4 could run on GPU


No comments:

Post a Comment

Popular Posts