# Goal:

This article research on how Spark calculates the Decimal precision and scale using GPU or CPU mode.

Basically we will test Addition/Subtraction/Multiplication/Division/Modulo/Union in this post.

# Env:

Spark 3.1.1

Rapids accelerator 0.5 snapshot with cuDF 0.19 snapshot jar

# Concept:

Spark's logic to calculates the Decimal precision and scale is inside DecimalPrecision.scala.

` * In particular, if we have expressions e1 and e2 with precision/scale p1/s1 and p2/s2 * respectively, then the following operations have the following precision / scale: * *   Operation    Result Precision                        Result Scale *   ------------------------------------------------------------------------ *   e1 + e2      max(s1, s2) + max(p1-s1, p2-s2) + 1     max(s1, s2) *   e1 - e2      max(s1, s2) + max(p1-s1, p2-s2) + 1     max(s1, s2) *   e1 * e2      p1 + p2 + 1                             s1 + s2 *   e1 / e2      p1 - s1 + s2 + max(6, s1 + p2 + 1)      max(6, s1 + p2 + 1) *   e1 % e2      min(p1-s1, p2-s2) + max(s1, s2)         max(s1, s2) *   e1 union e2  max(s1, s2) + max(p1-s1, p2-s2)         max(s1, s2)`

This matches the Hive's rule in this Hive Decimal Precision/Scale Support document.

Other than that, Spark has a parameter spark.sql.decimalOperations.allowPrecisionLoss (default true) to control if the precision / scale needed are out of the range of available values, the scale is reduced up to 6, in order to prevent the truncation of the integer part of the decimals.

Now let's look at GPU mode(with Rapids accelerator)'s limit:

Currently in Rapids accelerator 0.4.1/0.5 snapshot release, the limit for decimal is up to 18 digits(64bits) as per this Doc.

So if the precision is > 18, it will fallback to CPU mode.

Below let's do some tests to confirm the theory matches practice.

# Solution:

## 1. Prepare an example Dataframe with different types of decimal

`import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.sql.types._spark.conf.set("spark.rapids.sql.enabled", true)spark.conf.set("spark.rapids.sql.decimalType.enabled", true)val df = spark.sparkContext.parallelize(Seq(1)).toDF()val df2=df.withColumn("value82", (lit("123456.78").cast(DecimalType(8,2)))).           withColumn("value63", (lit("123.456").cast(DecimalType(6,3)))).           withColumn("value1510", (lit("12345.0123456789").cast(DecimalType(15,10)))).           withColumn("value2510", (lit("123456789012345.0123456789").cast(DecimalType(25,10))))df2.write.parquet("/tmp/df2.parquet")val newdf2=spark.read.parquet("/tmp/df2.parquet")newdf2.createOrReplaceTempView("df2")`
newdf2's schema:
`scala> newdf2.printSchemaroot |-- value: integer (nullable = false) |-- value82: decimal(8,2) (nullable = true) |-- value63: decimal(6,3) (nullable = true) |-- value1510: decimal(15,10) (nullable = true) |-- value2510: decimal(25,10) (nullable = true)`

## 2. GPU Mode (Result Decimal within GPU's limit : <=18 digits)

Below tests make sure all result decimal's precision is within GPU's limit which is 18 digits in this Rapids accelerator version.

So we only use 2 fields -- value82: decimal(8,2) and value63: decimal(6,3) of df2.

This is to confirm that the theory works fine in GPU mode or not.

To use above concept/theory to calculate the expected result precision and scale, let's use below code to calculate it in an easy way:

`import scala.math.{max, min}val (p1,s1)=(8,2)val (p2,s2)=(6,3)`

`val df_plus=spark.sql("SELECT value82+value63 FROM df2")df_plus.printSchemadf_plus.explaindf_plus.collect`

Output:

`scala> val df_plus=spark.sql("SELECT value82+value63 FROM df2")df_plus: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3))): decimal(10,3)]scala> df_plus.printSchemaroot |-- (CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3))): decimal(10,3) (nullable = true)scala> df_plus.explain== Physical Plan ==GpuColumnarToRow false+- GpuProject [gpucheckoverflow((gpupromoteprecision(cast(value82#58 as decimal(10,3))) + gpupromoteprecision(cast(value63#59 as decimal(10,3)))), DecimalType(10,3), true) AS (CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3)))#88]   +- GpuFileGpuScan parquet [value82#58,value63#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value63:decimal(6,3)>scala> df_plus.collectres21: Array[org.apache.spark.sql.Row] = Array([123580.236])`

The result Decimal is (10,3) which matches the theory, and it also runs on GPU as show from explain output.

`scala> max(s1, s2) + max(p1-s1, p2-s2) + 1res7: Int = 10scala> max(s1, s2)res8: Int = 3`

Note: In the following tests, I will just show you the result instead of printing too much output to save the length of this post. But feel free to do the math yourself.

### 2.2 Subtraction

`# Result Decimal (10,3)val df_minus=spark.sql("SELECT value82-value63 FROM df2")df_minus.printSchemadf_minus.explaindf_minus.collect`

### 2.3 Multiplication

`# Result Decimal (15,5) val df_multi=spark.sql("SELECT value82*value63 FROM df2")df_multi.printSchemadf_multi.explaindf_multi.collect`
Output:
`scala> val df_multi=spark.sql("SELECT value82*value63 FROM df2")df_multi: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3))): decimal(15,5)]scala> df_multi.printSchemaroot |-- (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3))): decimal(15,5) (nullable = true)scala> df_multi.explain21/05/04 18:02:21 WARN GpuOverrides:!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced  @Expression <Alias> CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) AS (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3)))#96 could run on GPU    @Expression <CheckOverflow> CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) could run on GPU      !Expression <Multiply> (promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))) cannot run on GPU because The actual output precision of the multiply is too large to fit on the GPU DecimalType(19,6)        @Expression <PromotePrecision> promote_precision(cast(value82#58 as decimal(9,3))) could run on GPU          @Expression <Cast> cast(value82#58 as decimal(9,3)) could run on GPU            @Expression <AttributeReference> value82#58 could run on GPU        @Expression <PromotePrecision> promote_precision(cast(value63#59 as decimal(9,3))) could run on GPU          @Expression <Cast> cast(value63#59 as decimal(9,3)) could run on GPU            @Expression <AttributeReference> value63#59 could run on GPU== Physical Plan ==*(1) Project [CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) AS (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3)))#96]+- GpuColumnarToRow false   +- GpuFileGpuScan parquet [value82#58,value63#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value63:decimal(6,3)>scala> df_multi.collectres27: Array[org.apache.spark.sql.Row] = Array([15241480.23168])`
Here even though the result Decimal is just (15,5) but it still falls back on CPU.
This is because Spark inserts "PromotePrecision" to CAST both sides to the same type -- Decimal(9,3).
Currently GPU has to be very cautious on this PromotePrecision, so it thought the result is Decimal (19,6) instead of (15,5).

### 2.4 Division

`# Result Decimal (18,9) -- Fallback on CPUval df_div=spark.sql("SELECT value82/value63 FROM df2")df_div.printSchemadf_div.explaindf_div.collect`

### 2.5 Modulo

`# Result Decimal (6,3) -- Fallback on CPUval df_mod=spark.sql("SELECT value82 % value63 FROM df2")df_mod.printSchemadf_mod.explaindf_mod.collect`
Note: this is because Modulo is not supported for Decimal on GPU as per this supported_ops.md.

### 2.6 Union

`# Result Decimal (9,3) val df_union=spark.sql("SELECT value82 from df2 union SELECT value63 from df2")df_union.printSchemadf_union.explaindf_union.collect`

## 3. GPU Mode fallback to CPU (19 ~ 38 digits)

Below tests may fall back to CPU if result decimal's precision is above GPU's limit.

So we only use 2 fields -- value82: decimal(8,2) and value1510: decimal(15,10) of df2.

`# Result Decimal (17,10) -- within GPU limitval df_plus=spark.sql("SELECT value82+value1510 FROM df2")df_plus.printSchemadf_plus.explaindf_plus.collect`

### 3.2 Subtraction

`# Result Decimal (17,10) -- within GPU limitval df_minus=spark.sql("SELECT value82-value1510 FROM df2")df_minus.printSchemadf_minus.explaindf_minus.collect`

### 3.3 Multiplication

`# Result Decimal (24,12) -- outside of GPU limitval df_multi=spark.sql("SELECT value82*value1510 FROM df2")df_multi.printSchemadf_multi.explain`
Output:
`scala> val df_multi=spark.sql("SELECT value82*value1510 FROM df2")df_multi: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10))): decimal(24,12)]scala> df_multi.printSchemaroot |-- (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10))): decimal(24,12) (nullable = true)scala> df_multi.explain21/05/04 18:44:46 WARN GpuOverrides:!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced; unsupported data types in output: DecimalType(24,12)  !Expression <Alias> CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132 cannot run on GPU because expression Alias CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132 produces an unsupported type DecimalType(24,12); expression CheckOverflow CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) produces an unsupported type DecimalType(24,12)    !Expression <CheckOverflow> CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) cannot run on GPU because expression CheckOverflow CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) produces an unsupported type DecimalType(24,12)      !Expression <Multiply> (promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))) cannot run on GPU because The actual output precision of the multiply is too large to fit on the GPU DecimalType(33,20)        @Expression <PromotePrecision> promote_precision(cast(value82#58 as decimal(16,10))) could run on GPU          @Expression <Cast> cast(value82#58 as decimal(16,10)) could run on GPU            @Expression <AttributeReference> value82#58 could run on GPU        @Expression <PromotePrecision> promote_precision(cast(value1510#60 as decimal(16,10))) could run on GPU          @Expression <Cast> cast(value1510#60 as decimal(16,10)) could run on GPU            @Expression <AttributeReference> value1510#60 could run on GPU== Physical Plan ==*(1) Project [CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132]+- GpuColumnarToRow false   +- GpuFileGpuScan parquet [value82#58,value1510#60] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value1510:decimal(15,10)>scala> df_multi.collectres51: Array[org.apache.spark.sql.Row] = Array([1524075473.257763907942])`

### 3.4 Division

`# Result Decimal (34,18) -- outside of GPU limitval df_div=spark.sql("SELECT value82/value1510 FROM df2")df_div.printSchemadf_div.explaindf_div.collect`

### 3.5 Modulo

`# Result Decimal(15,10) -- within GPU limit, but fallback on CPUval df_mod=spark.sql("SELECT value82 % value1510 FROM df2")df_mod.printSchemadf_mod.explaindf_mod.collect`
Note: this is because Modulo is not supported for Decimal on GPU as per this supported_ops.md

### 3.6 Union

`# Result Decimal (16,10) -- within GPU limitval df_union=spark.sql("SELECT value82 from df2 union SELECT value1510 from df2")df_union.printSchemadf_union.explaindf_union.collect`

## 4. Above decimal max range (> 38 digits)

If the result decimal is above 38 digits, spark.sql.decimalOperations.allowPrecisionLoss can be used to control the behavior.
So we only use 2 fields -- value1510: decimal(15,10) and value2510: decimal(25,10) of df2.
`# Result Decimal (38,17)val df_multi=spark.sql("SELECT value1510*value2510 FROM df2")df_multi.printSchemadf_multi.explaindf_multi.collect`
As per the theory, the result decimal should be (41,20):
`scala> val (p1,s1)=(15,10)p1: Int = 15s1: Int = 10scala> val (p2,s2)=(25,10)p2: Int = 25s2: Int = 10scala> p1 + p2 + 1res31: Int = 41scala> s1 + s2res32: Int = 20`
However since 41>38, so another function adjustPrecisionScale inside DecimalType.scala is called to adjust the precision and scale.
For this specific example, below code logic is applied:
`    } else {      // Precision/scale exceed maximum precision. Result must be adjusted to MAX_PRECISION.      val intDigits = precision - scale      // If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; otherwise      // preserve at least MINIMUM_ADJUSTED_SCALE fractional digits      val minScaleValue = Math.min(scale, MINIMUM_ADJUSTED_SCALE)      // The resulting scale is the maximum between what is available without causing a loss of      // digits for the integer part of the decimal and the minimum guaranteed scale, which is      // computed above      val adjustedScale = Math.max(MAX_PRECISION - intDigits, minScaleValue)      DecimalType(MAX_PRECISION, adjustedScale)    }`
That is why the result decimal is (38,17).

Since above function is only called when spark.sql.decimalOperations.allowPrecisionLoss=true, so if we set it false, it will return null:
`scala> df_multi.collectres67: Array[org.apache.spark.sql.Row] = Array([null])`