Tuesday, April 27, 2021

Rapids Accelerator compatibility related to spark.sql.legacy.parquet.datetimeRebaseModeInWrite

Goal:

This article talked about the compatibility of Rapids Accelerator for Spark regarding parquet writing related to parameters spark.sql.legacy.parquet.datetimeRebaseModeInWrite etc.

Env:

Spark 3.1.1

Rapids Accelerator for Spark 0.5 snapshot

Solution:

Spark 3.0 made the change to use Proleptic Gregorian calendar instead of hybrid Gregorian+Julian calendar. So it caused some trouble when reading/writing to/from old "legacy" format from Spark 2.x.

Here is a nice blog to explain the change, and I would strongly recommend read it firstly.

  • SPARK-31405 (starting from 3.0) introduced parameter spark.sql.legacy.parquet.datetimeRebaseModeInWrite which influences on writes of the following parquet logical types:DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS. 
  • SPARK-33210 (starting from 3.1) introduced another parameter spark.sql.legacy.parquet.int96RebaseModeInWrite for INT96 type(timestamp).

Here are 3 values:

  • EXCEPTION (Default): Spark will fail the writing if it sees ancient dates/timestamps that are ambiguous between the two calendars.
  • LEGACY: Spark will rebase dates/timestamps from Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files
  • CORRECTED: Spark will not do rebase and write the dates/timestamps as it is.

In CPU mode, let's firstly look at the behaviors.

1. CPU Mode

1.1 EXCEPTION (Default)

import java.sql.Date
spark.conf.set("spark.rapids.sql.enabled", false)
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "EXCEPTION")
Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_exception")

It will fail with:

Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: 
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous,
as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar.
See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability.
Or set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is,
if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.

1.2 LEGACY

spark.conf.set("spark.rapids.sql.enabled", false)
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_legacy")
spark.read.parquet("/tmp/testparquet_legacy").createOrReplaceTempView("date_legacy")
spark.sql("SELECT * FROM date_legacy").explain
spark.sql("SELECT * FROM date_legacy").show

Output:

scala> spark.sql("SELECT * FROM date_legacy").explain
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [dt#30] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet_legacy], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dt:date>


scala> spark.sql("SELECT * FROM date_legacy").show
+----------+
| dt|
+----------+
|1500-12-25|
+----------+

1.3 CORRECTED

spark.conf.set("spark.rapids.sql.enabled", false)
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_corrected")
spark.read.parquet("/tmp/testparquet_corrected").createOrReplaceTempView("date_corrected")
spark.sql("SELECT * FROM date_corrected").explain
spark.sql("SELECT * FROM date_corrected").show

Output:

scala> spark.sql("SELECT * FROM date_corrected").explain
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [dt#46] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet_corrected], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dt:date>


scala> spark.sql("SELECT * FROM date_corrected").show
+----------+
| dt|
+----------+
|1500-12-25|
+----------+

2. GPU Mode

2.1 EXCEPTION (Default)

import java.sql.Date
spark.conf.set("spark.rapids.sql.enabled", true)
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "EXCEPTION")
Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_exception")

It will fail with:

Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: 
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous,
as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar.
See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability.
Or set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is,
if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.

2.2 LEGACY

spark.conf.set("spark.rapids.sql.enabled", true)
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_legacy")
spark.read.parquet("/tmp/testparquet_legacy").createOrReplaceTempView("date_legacy")
spark.sql("SELECT * FROM date_legacy").explain
spark.sql("SELECT * FROM date_legacy").show

The data writing can finish successfully since we use LEGACY value, but it is done by CPU instead of GPU(see the warning message"Output <InsertIntoHadoopFsRelationCommand> cannot run on GPU because LEGACY rebase mode for dates and timestamps is not supported"):

scala> Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_legacy")
21/04/28 01:29:27 WARN GpuOverrides:
!Exec <DataWritingCommandExec> cannot run on GPU because not all data writing commands can be replaced
!Output <InsertIntoHadoopFsRelationCommand> cannot run on GPU because LEGACY rebase mode for dates and timestamps is not supported
!NOT_FOUND <LocalTableScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.LocalTableScanExec could be found
@Expression <AttributeReference> dt#66 could run on GPU

Spark UI can show the query plan which is on CPU as well:

The data reading fails with below error message and suggest us to set spark.sql.legacy.parquet.datetimeRebaseModeInRead to CORRECTED.
scala> spark.sql("SELECT * FROM date_legacy").explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuFileGpuScan parquet [dt#69] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet_legacy], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dt:date>


scala> spark.sql("SELECT * FROM date_legacy").show
21/04/28 01:29:28 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 19) (111.111.111.111 executor 0): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. The RAPIDS Accelerator does not support reading these 'LEGACY' files. To do so you should disable Parquet support in the RAPIDS Accelerator or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is.

Even after setting spark.sql.legacy.parquet.datetimeRebaseModeInRead to CORRECTED or LEGACY, it still fails with the same error.

2.3 CORRECTED

spark.conf.set("spark.rapids.sql.enabled", true)
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_corrected")
spark.read.parquet("/tmp/testparquet_corrected").createOrReplaceTempView("date_corrected")
spark.sql("SELECT * FROM date_corrected").explain
spark.sql("SELECT * FROM date_corrected").show

The data writing can finish successfully on GPU since we use CORRECTED value:

scala> Seq(Date.valueOf("1500-12-25")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_corrected")
21/04/28 01:58:23 WARN GpuOverrides:
!NOT_FOUND <LocalTableScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.LocalTableScanExec could be found
@Expression <AttributeReference> dt#140 could run on GPU

Spark UI can show the query plan which is on GPU as well:

The data reading also works fine on GPU:
scala> spark.sql("SELECT * FROM date_corrected").explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuFileGpuScan parquet [dt#143] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet_corrected], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dt:date>


scala> spark.sql("SELECT * FROM date_corrected").show
+----------+
| dt|
+----------+
|1500-12-25|
+----------+

3. Int96 timestamp tests

Of course, we can do similar tests for int96 timestamp type using below scripts. 

Here I will let you try it out.

spark.conf.set("spark.rapids.sql.enabled", true)

spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "EXCEPTION")
Seq(java.sql.Timestamp.valueOf("1500-01-01 00:00:00")).toDF("ts").write.format("parquet").mode("overwrite").save("/tmp/testparquet_exception")
spark.read.parquet("/tmp/testparquet_exception").createOrReplaceTempView("ts_exception")
spark.sql("SELECT * FROM ts_exception").explain
spark.sql("SELECT * FROM ts_exception").show

spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")
Seq(java.sql.Timestamp.valueOf("1500-01-01 00:00:00")).toDF("ts").write.format("parquet").mode("overwrite").save("/tmp/testparquet_legacy")
spark.read.parquet("/tmp/testparquet_legacy").createOrReplaceTempView("ts_legacy")
spark.sql("SELECT * FROM ts_legacy").explain
spark.sql("SELECT * FROM ts_legacy").show

spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
Seq(java.sql.Timestamp.valueOf("1500-01-01 00:00:00")).toDF("ts").write.format("parquet").mode("overwrite").save("/tmp/testparquet_corrected")
spark.read.parquet("/tmp/testparquet_corrected").createOrReplaceTempView("ts_corrected")
spark.sql("SELECT * FROM ts_corrected").explain
spark.sql("SELECT * FROM ts_corrected").show

4. 1582-10-15 behaviors

As you remember, the error message shows that "reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous".

Here we focus on date which is 1582-10-15.

Let's use below sample test program on both CPU mode and GPU mode, and change the date "1582-10-15" to older dates in the following tests.

spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
Seq(Date.valueOf("1582-10-15")).toDF("dt").write.format("parquet").mode("overwrite").save("/tmp/testparquet_legacy")

spark.read.parquet("/tmp/testparquet_legacy").createOrReplaceTempView("date_legacy")
spark.sql("SELECT * FROM date_legacy").explain
spark.sql("SELECT * FROM date_legacy").show

4.1 1582-10-15

Both CPU and GPU Modes can successfully read it as 1582-10-15:

scala> spark.sql("SELECT * FROM date_legacy").show
+----------+
| dt|
+----------+
|1582-10-15|
+----------+

4.2 1582-10-14

Both CPU and GPU Modes started to show ambiguous result: 1582-10-24 which is "original date"+10:

scala> spark.sql("SELECT * FROM date_legacy").show
+----------+
| dt|
+----------+
|1582-10-24|
+----------+
This "original date"+10 behavior lasts until 1582-10-05.

4.3 1582-10-04

CPU Mode can successfully read it as 1582-10-04 going forward:

scala> spark.sql("SELECT * FROM date_legacy").show
+----------+
| dt|
+----------+
|1582-10-04|
+----------+

However GPU Mode will fail since 1582-10-04:

Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. The RAPIDS Accelerator does not support reading these 'LEGACY' files. To do so you should disable Parquet support in the RAPIDS Accelerator or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is.
at org.apache.spark.sql.rapids.execution.TrampolineUtil$.makeSparkUpgradeException(TrampolineUtil.scala:78)
at com.nvidia.spark.RebaseHelper$.newRebaseExceptionInRead(RebaseHelper.scala:83)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.$anonfun$readToTable$3(GpuParquetScan.scala:1162)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.$anonfun$readToTable$2(GpuParquetScan.scala:1160)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.$anonfun$readToTable$2$adapted(GpuParquetScan.scala:1158)
at com.nvidia.spark.rapids.Arm.closeOnExcept(Arm.scala:76)
at com.nvidia.spark.rapids.Arm.closeOnExcept$(Arm.scala:74)
at com.nvidia.spark.rapids.FileParquetPartitionReaderBase.closeOnExcept(GpuParquetScan.scala:504)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.readToTable(GpuParquetScan.scala:1158)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.$anonfun$readBatch$1(GpuParquetScan.scala:1113)
at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
at com.nvidia.spark.rapids.FileParquetPartitionReaderBase.withResource(GpuParquetScan.scala:504)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.readBatch(GpuParquetScan.scala:1098)
at com.nvidia.spark.rapids.MultiFileParquetPartitionReader.next(GpuParquetScan.scala:926)
at com.nvidia.spark.rapids.PartitionIterator.hasNext(GpuDataSourceRDD.scala:59)
at com.nvidia.spark.rapids.MetricsBatchIterator.hasNext(GpuDataSourceRDD.scala:76)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.rapids.GpuFileSourceScanExec$$anon$1.hasNext(GpuFileSourceScanExec.scala:385)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at com.nvidia.spark.rapids.GpuBaseLimitExec$$anon$1.hasNext(limit.scala:62)
at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:208)
at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:225)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

 

==


No comments:

Post a Comment

Popular Posts