Friday, March 19, 2021

Understanding RAPIDS Accelerator For Apache Spark's supported timezone

Goal:

This article explains the current supported timezone for "RAPIDS Accelerator For Apache Spark".

Env:

RAPIDS Accelerator For Apache Spark 0.4

Concept:

As per current 0.4 Doc mentions: 

operations involving timestamps will only be GPU-accelerated if the time zone used by the JVM is UTC.

It means if the JVM timezone of the Spark job is not UTC, the operations involving timestamp will be fallback to CPU which result in performance overhead.

Here it includes non-supported and supported timestamp format conversion.

Note: supported timestamp formats are documented in this Compatibility doc.

Test:

Below Spark Cluster nodes are using PST timezone.

1. PST JVM timezone + supported timestamp format

Let's start a spark-shell without any JVM timezone change and run below timestamp conversion on supported format:

scala> val df_supported = Seq(("2021-12-25 11:11:11")).toDF("ts")
df_supported: org.apache.spark.sql.DataFrame = [ts: string]

scala> df_supported.write.format("parquet").mode("overwrite").save("/tmp/testts_supported.parquet")
21/03/19 21:58:29 WARN GpuOverrides:
!NOT_FOUND <LocalTableScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.LocalTableScanExec could be found
@Expression <AttributeReference> ts#4 could run on GPU


scala> spark.read.parquet("/tmp/testts_supported.parquet").createOrReplaceTempView("df_supported")

scala> spark.sql("select to_timestamp(ts, 'yyyy-MM-dd HH:mm:ss') from df_supported").explain
21/03/19 21:58:31 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because unsupported data types in output: TimestampType; not all expressions can be replaced
!Expression <Alias> gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) AS to_timestamp(ts, yyyy-MM-dd HH:mm:ss)#9 cannot run on GPU because expression Alias gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) AS to_timestamp(ts, yyyy-MM-dd HH:mm:ss)#9 produces an unsupported type TimestampType; expression GetTimestamp gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) produces an unsupported type TimestampType
!Expression <GetTimestamp> gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) cannot run on GPU because expression GetTimestamp gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) produces an unsupported type TimestampType
@Expression <AttributeReference> ts#7 could run on GPU
@Expression <Literal> yyyy-MM-dd HH:mm:ss could run on GPU

== Physical Plan ==
*(1) Project [gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) AS to_timestamp(ts, yyyy-MM-dd HH:mm:ss)#9]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet [ts#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testts_supported.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ts:string>



scala> spark.sql("select to_timestamp(ts, 'yyyy-MM-dd HH:mm:ss') from df_supported").show
21/03/19 21:58:31 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <Alias> cast(gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) as string) AS to_timestamp(ts, yyyy-MM-dd HH:mm:ss)#14 could run on GPU
@Expression <Cast> cast(gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) as string) could run on GPU
!Expression <GetTimestamp> gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) cannot run on GPU because expression GetTimestamp gettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, Some(UTC), false) produces an unsupported type TimestampType
@Expression <AttributeReference> ts#7 could run on GPU
@Expression <Literal> yyyy-MM-dd HH:mm:ss could run on GPU

+-------------------------------------+
|to_timestamp(ts, yyyy-MM-dd HH:mm:ss)|
+-------------------------------------+
| 2021-12-25 11:11:11|
+-------------------------------------+

As you can see above, the operation "to_timestamp" fallback to CPU mode with the keyword in the query plan -- "Project".

From Spark UI's query plan, we can see "GpuColumnarToRow" and "GpuRowToColumnar".

This indicates performance overhead since data is moved between GPU and CPU:

2. UTC JVM timezone + supported timestamp format

To make supported timestamp operation work, we do not need to change the timezone of the machines if the machine timezone is not UTC.

We just need to change the JVM timezone for driver and executor.

The method is described in this Doc:

  • spark.driver.extraJavaOptions should include -Duser.timezone=UTC
  • spark.executor.extraJavaOptions should include -Duser.timezone=UTC
  • spark.sql.session.timeZone=UTC

Then run the same tests in spark-shell after changing JVM timezone to UTC:

spark-shell --conf spark.sql.session.timeZone=UTC --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
scala> val df_supported = Seq(("2021-12-25 11:11:11")).toDF("ts")
df_supported: org.apache.spark.sql.DataFrame = [ts: string]

scala> df_supported.write.format("parquet").mode("overwrite").save("/tmp/testts_supported.parquet")
21/03/20 06:11:56 WARN GpuOverrides:
!NOT_FOUND <LocalTableScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.LocalTableScanExec could be found
@Expression <AttributeReference> ts#4 could run on GPU


scala> spark.read.parquet("/tmp/testts_supported.parquet").createOrReplaceTempView("df_supported")

scala> spark.sql("select to_timestamp(ts, 'yyyy-MM-dd HH:mm:ss') from df_supported").explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [gpugettimestamp(ts#7, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss, %Y-%m-%d %H:%M:%S, None) AS to_timestamp(ts, yyyy-MM-dd HH:mm:ss)#9]
+- GpuFileGpuScan parquet [ts#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testts_supported.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ts:string>



scala> spark.sql("select to_timestamp(ts, 'yyyy-MM-dd HH:mm:ss') from df_supported").show
+-------------------------------------+
|to_timestamp(ts, yyyy-MM-dd HH:mm:ss)|
+-------------------------------------+
| 2021-12-25 11:11:11|
+-------------------------------------+

As you can see above, the operation "to_timestamp" now runs in GPU mode with the keyword in the query plan -- "GpuProject". Spark UI shows the same:

 


3. UTC JVM timezone + non-supported timestamp format

For non-supported timestamp format, it will still fallback to CPU mode.

For example: "MMM" is not supported in 0.4.

scala> val df_notsupported = Seq(("2021-Dec-25 11:11:11")).toDF("ts")
df_notsupported: org.apache.spark.sql.DataFrame = [ts: string]

scala> df_notsupported.write.format("parquet").mode("overwrite").save("/tmp/testts_notsupported.parquet")
21/03/20 06:15:49 WARN GpuOverrides:
!NOT_FOUND <LocalTableScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.LocalTableScanExec could be found
@Expression <AttributeReference> ts#22 could run on GPU


scala> spark.read.parquet("/tmp/testts_notsupported.parquet").createOrReplaceTempView("df_notsupported")

scala> spark.sql("select to_timestamp(ts, 'yyyy-MMM-dd HH:mm:ss') from df_notsupported").explain
21/03/20 06:15:50 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <Alias> gettimestamp(ts#25, yyyy-MMM-dd HH:mm:ss, Some(UTC), false) AS to_timestamp(ts, yyyy-MMM-dd HH:mm:ss)#27 could run on GPU
!Expression <GetTimestamp> gettimestamp(ts#25, yyyy-MMM-dd HH:mm:ss, Some(UTC), false) cannot run on GPU because incompatible format 'yyyy-MMM-dd HH:mm:ss'. Set spark.rapids.sql.incompatibleDateFormats.enabled=true to force onto GPU.
@Expression <AttributeReference> ts#25 could run on GPU
@Expression <Literal> yyyy-MMM-dd HH:mm:ss could run on GPU

== Physical Plan ==
*(1) Project [gettimestamp(ts#25, yyyy-MMM-dd HH:mm:ss, Some(UTC), false) AS to_timestamp(ts, yyyy-MMM-dd HH:mm:ss)#27]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet [ts#25] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testts_notsupported.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ts:string>



scala> spark.sql("select to_timestamp(ts, 'yyyy-MMM-dd HH:mm:ss') from df_notsupported").show
21/03/20 06:15:51 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <Alias> cast(gettimestamp(ts#25, yyyy-MMM-dd HH:mm:ss, Some(UTC), false) as string) AS to_timestamp(ts, yyyy-MMM-dd HH:mm:ss)#32 could run on GPU
@Expression <Cast> cast(gettimestamp(ts#25, yyyy-MMM-dd HH:mm:ss, Some(UTC), false) as string) could run on GPU
!Expression <GetTimestamp> gettimestamp(ts#25, yyyy-MMM-dd HH:mm:ss, Some(UTC), false) cannot run on GPU because incompatible format 'yyyy-MMM-dd HH:mm:ss'. Set spark.rapids.sql.incompatibleDateFormats.enabled=true to force onto GPU.
@Expression <AttributeReference> ts#25 could run on GPU
@Expression <Literal> yyyy-MMM-dd HH:mm:ss could run on GPU

+--------------------------------------+
|to_timestamp(ts, yyyy-MMM-dd HH:mm:ss)|
+--------------------------------------+
| 2021-12-25 11:11:11|
+--------------------------------------+

 

Below are test code for pyspark users:

from pyspark.sql.functions import to_timestamp
from pyspark.sql import Row
df_supported=sc.parallelize([Row(ts='2021-12-25 11:11:11')]).toDF()
df_supported.write.format("parquet").mode("overwrite").save("/tmp/testts_supported.parquet")
spark.read.parquet('/tmp/testts_supported.parquet').createOrReplaceTempView("df_supported")
spark.sql("select to_timestamp(ts, 'yyyy-MM-dd HH:mm:ss') from df_supported").explain()
spark.sql("select to_timestamp(ts, 'yyyy-MM-dd HH:mm:ss') from df_supported").show()

df_notsupported=sc.parallelize([Row(ts='2021-Dec-25 11:11:11')]).toDF()
df_notsupported.write.format("parquet").mode("overwrite").save("/tmp/testts_notsupported.parquet")
spark.read.parquet('/tmp/testts_notsupported.parquet').createOrReplaceTempView("df_notsupported")
spark.sql("select to_timestamp(ts, 'yyyy-MMM-dd HH:mm:ss') from df_notsupported").explain()
spark.sql("select to_timestamp(ts, 'yyyy-MMM-dd HH:mm:ss') from df_notsupported").show()

Note: there is one parameter "spark.rapids.sql.incompatibleDateFormats.enabled" which does below:

"When parsing strings as dates and timestamps in functions like unix_timestamp, setting this to true will force all parsing onto GPU even for formats that can result in incorrect results when parsing invalid inputs."

 


No comments:

Post a Comment

Popular Posts