Goal:
This article research on how Spark calculates the Decimal precision and scale using GPU or CPU mode.
Basically we will test Addition/Subtraction/Multiplication/Division/Modulo/Union in this post.
Env:
Spark 3.1.1
Rapids accelerator 0.5 snapshot with cuDF 0.19 snapshot jar
Concept:
Spark's logic to calculates the Decimal precision and scale is inside DecimalPrecision.scala.
* In particular, if we have expressions e1 and e2 with precision/scale p1/s1 and p2/s2
* respectively, then the following operations have the following precision / scale:
*
* Operation Result Precision Result Scale
* ------------------------------------------------------------------------
* e1 + e2 max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2)
* e1 - e2 max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2)
* e1 * e2 p1 + p2 + 1 s1 + s2
* e1 / e2 p1 - s1 + s2 + max(6, s1 + p2 + 1) max(6, s1 + p2 + 1)
* e1 % e2 min(p1-s1, p2-s2) + max(s1, s2) max(s1, s2)
* e1 union e2 max(s1, s2) + max(p1-s1, p2-s2) max(s1, s2)
This matches the Hive's rule in this Hive Decimal Precision/Scale Support document.
Other than that, Spark has a parameter spark.sql.decimalOperations.allowPrecisionLoss (default true) to control if the precision / scale needed are out of the range of available values, the scale is reduced up to 6, in order to prevent the truncation of the integer part of the decimals.
Now let's look at GPU mode(with Rapids accelerator)'s limit:
Currently in Rapids accelerator 0.4.1/0.5 snapshot release, the limit for decimal is up to 18 digits(64bits) as per this Doc.
So if the precision is > 18, it will fallback to CPU mode.
Below let's do some tests to confirm the theory matches practice.
Solution:
1. Prepare an example Dataframe with different types of decimal
import org.apache.spark.sql.functions._newdf2's schema:
import spark.implicits._
import org.apache.spark.sql.types._
spark.conf.set("spark.rapids.sql.enabled", true)
spark.conf.set("spark.rapids.sql.decimalType.enabled", true)
val df = spark.sparkContext.parallelize(Seq(1)).toDF()
val df2=df.withColumn("value82", (lit("123456.78").cast(DecimalType(8,2)))).
withColumn("value63", (lit("123.456").cast(DecimalType(6,3)))).
withColumn("value1510", (lit("12345.0123456789").cast(DecimalType(15,10)))).
withColumn("value2510", (lit("123456789012345.0123456789").cast(DecimalType(25,10))))
df2.write.parquet("/tmp/df2.parquet")
val newdf2=spark.read.parquet("/tmp/df2.parquet")
newdf2.createOrReplaceTempView("df2")
scala> newdf2.printSchema
root
|-- value: integer (nullable = false)
|-- value82: decimal(8,2) (nullable = true)
|-- value63: decimal(6,3) (nullable = true)
|-- value1510: decimal(15,10) (nullable = true)
|-- value2510: decimal(25,10) (nullable = true)
2. GPU Mode (Result Decimal within GPU's limit : <=18 digits)
Below tests make sure all result decimal's precision is within GPU's limit which is 18 digits in this Rapids accelerator version.
So we only use 2 fields -- value82: decimal(8,2) and value63: decimal(6,3) of df2.
This is to confirm that the theory works fine in GPU mode or not.
To use above concept/theory to calculate the expected result precision and scale, let's use below code to calculate it in an easy way:
import scala.math.{max, min}
val (p1,s1)=(8,2)
val (p2,s2)=(6,3)
2.1 Addition
val df_plus=spark.sql("SELECT value82+value63 FROM df2")
df_plus.printSchema
df_plus.explain
df_plus.collect
Output:
scala> val df_plus=spark.sql("SELECT value82+value63 FROM df2")
df_plus: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3))): decimal(10,3)]
scala> df_plus.printSchema
root
|-- (CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3))): decimal(10,3) (nullable = true)
scala> df_plus.explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [gpucheckoverflow((gpupromoteprecision(cast(value82#58 as decimal(10,3))) + gpupromoteprecision(cast(value63#59 as decimal(10,3)))), DecimalType(10,3), true) AS (CAST(value82 AS DECIMAL(10,3)) + CAST(value63 AS DECIMAL(10,3)))#88]
+- GpuFileGpuScan parquet [value82#58,value63#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value63:decimal(6,3)>
scala> df_plus.collect
res21: Array[org.apache.spark.sql.Row] = Array([123580.236])
The result Decimal is (10,3) which matches the theory, and it also runs on GPU as show from explain output.
scala> max(s1, s2) + max(p1-s1, p2-s2) + 1
res7: Int = 10
scala> max(s1, s2)
res8: Int = 3
Note: In the following tests, I will just show you the result instead of printing too much output to save the length of this post. But feel free to do the math yourself.
2.2 Subtraction
# Result Decimal (10,3)
val df_minus=spark.sql("SELECT value82-value63 FROM df2")
df_minus.printSchema
df_minus.explain
df_minus.collect
2.3 Multiplication
# Result Decimal (15,5)
val df_multi=spark.sql("SELECT value82*value63 FROM df2")
df_multi.printSchema
df_multi.explain
df_multi.collect
scala> val df_multi=spark.sql("SELECT value82*value63 FROM df2")
df_multi: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3))): decimal(15,5)]
scala> df_multi.printSchema
root
|-- (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3))): decimal(15,5) (nullable = true)
scala> df_multi.explain
21/05/04 18:02:21 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <Alias> CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) AS (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3)))#96 could run on GPU
@Expression <CheckOverflow> CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) could run on GPU
!Expression <Multiply> (promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))) cannot run on GPU because The actual output precision of the multiply is too large to fit on the GPU DecimalType(19,6)
@Expression <PromotePrecision> promote_precision(cast(value82#58 as decimal(9,3))) could run on GPU
@Expression <Cast> cast(value82#58 as decimal(9,3)) could run on GPU
@Expression <AttributeReference> value82#58 could run on GPU
@Expression <PromotePrecision> promote_precision(cast(value63#59 as decimal(9,3))) could run on GPU
@Expression <Cast> cast(value63#59 as decimal(9,3)) could run on GPU
@Expression <AttributeReference> value63#59 could run on GPU
== Physical Plan ==
*(1) Project [CheckOverflow((promote_precision(cast(value82#58 as decimal(9,3))) * promote_precision(cast(value63#59 as decimal(9,3)))), DecimalType(15,5), true) AS (CAST(value82 AS DECIMAL(9,3)) * CAST(value63 AS DECIMAL(9,3)))#96]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet [value82#58,value63#59] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value63:decimal(6,3)>
scala> df_multi.collect
res27: Array[org.apache.spark.sql.Row] = Array([15241480.23168])
2.4 Division
# Result Decimal (18,9) -- Fallback on CPU
val df_div=spark.sql("SELECT value82/value63 FROM df2")
df_div.printSchema
df_div.explain
df_div.collect
2.5 Modulo
# Result Decimal (6,3) -- Fallback on CPU
val df_mod=spark.sql("SELECT value82 % value63 FROM df2")
df_mod.printSchema
df_mod.explain
df_mod.collect
2.6 Union
# Result Decimal (9,3)
val df_union=spark.sql("SELECT value82 from df2 union SELECT value63 from df2")
df_union.printSchema
df_union.explain
df_union.collect
3. GPU Mode fallback to CPU (19 ~ 38 digits)
Below tests may fall back to CPU if result decimal's precision is above GPU's limit.
So we only use 2 fields -- value82: decimal(8,2) and value1510: decimal(15,10) of df2.
3.1 Addition
# Result Decimal (17,10) -- within GPU limit
val df_plus=spark.sql("SELECT value82+value1510 FROM df2")
df_plus.printSchema
df_plus.explain
df_plus.collect
3.2 Subtraction
# Result Decimal (17,10) -- within GPU limit
val df_minus=spark.sql("SELECT value82-value1510 FROM df2")
df_minus.printSchema
df_minus.explain
df_minus.collect
3.3 Multiplication
# Result Decimal (24,12) -- outside of GPU limit
val df_multi=spark.sql("SELECT value82*value1510 FROM df2")
df_multi.printSchema
df_multi.explain
scala> val df_multi=spark.sql("SELECT value82*value1510 FROM df2")
df_multi: org.apache.spark.sql.DataFrame = [(CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10))): decimal(24,12)]
scala> df_multi.printSchema
root
|-- (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10))): decimal(24,12) (nullable = true)
scala> df_multi.explain
21/05/04 18:44:46 WARN GpuOverrides:
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced; unsupported data types in output: DecimalType(24,12)
!Expression <Alias> CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132 cannot run on GPU because expression Alias CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132 produces an unsupported type DecimalType(24,12); expression CheckOverflow CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) produces an unsupported type DecimalType(24,12)
!Expression <CheckOverflow> CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) cannot run on GPU because expression CheckOverflow CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) produces an unsupported type DecimalType(24,12)
!Expression <Multiply> (promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))) cannot run on GPU because The actual output precision of the multiply is too large to fit on the GPU DecimalType(33,20)
@Expression <PromotePrecision> promote_precision(cast(value82#58 as decimal(16,10))) could run on GPU
@Expression <Cast> cast(value82#58 as decimal(16,10)) could run on GPU
@Expression <AttributeReference> value82#58 could run on GPU
@Expression <PromotePrecision> promote_precision(cast(value1510#60 as decimal(16,10))) could run on GPU
@Expression <Cast> cast(value1510#60 as decimal(16,10)) could run on GPU
@Expression <AttributeReference> value1510#60 could run on GPU
== Physical Plan ==
*(1) Project [CheckOverflow((promote_precision(cast(value82#58 as decimal(16,10))) * promote_precision(cast(value1510#60 as decimal(16,10)))), DecimalType(24,12), true) AS (CAST(value82 AS DECIMAL(16,10)) * CAST(value1510 AS DECIMAL(16,10)))#132]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet [value82#58,value1510#60] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/df2.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value82:decimal(8,2),value1510:decimal(15,10)>
scala> df_multi.collect
res51: Array[org.apache.spark.sql.Row] = Array([1524075473.257763907942])
3.4 Division
# Result Decimal (34,18) -- outside of GPU limit
val df_div=spark.sql("SELECT value82/value1510 FROM df2")
df_div.printSchema
df_div.explain
df_div.collect
3.5 Modulo
# Result Decimal(15,10) -- within GPU limit, but fallback on CPU
val df_mod=spark.sql("SELECT value82 % value1510 FROM df2")
df_mod.printSchema
df_mod.explain
df_mod.collect
3.6 Union
# Result Decimal (16,10) -- within GPU limit
val df_union=spark.sql("SELECT value82 from df2 union SELECT value1510 from df2")
df_union.printSchema
df_union.explain
df_union.collect
4. Above decimal max range (> 38 digits)
# Result Decimal (38,17)
val df_multi=spark.sql("SELECT value1510*value2510 FROM df2")
df_multi.printSchema
df_multi.explain
df_multi.collect
scala> val (p1,s1)=(15,10)
p1: Int = 15
s1: Int = 10
scala> val (p2,s2)=(25,10)
p2: Int = 25
s2: Int = 10
scala> p1 + p2 + 1
res31: Int = 41
scala> s1 + s2
res32: Int = 20
} else {
// Precision/scale exceed maximum precision. Result must be adjusted to MAX_PRECISION.
val intDigits = precision - scale
// If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; otherwise
// preserve at least MINIMUM_ADJUSTED_SCALE fractional digits
val minScaleValue = Math.min(scale, MINIMUM_ADJUSTED_SCALE)
// The resulting scale is the maximum between what is available without causing a loss of
// digits for the integer part of the decimal and the minimum guaranteed scale, which is
// computed above
val adjustedScale = Math.max(MAX_PRECISION - intDigits, minScaleValue)
DecimalType(MAX_PRECISION, adjustedScale)
}
scala> df_multi.collect
res67: Array[org.apache.spark.sql.Row] = Array([null])
References:
Great survey, I'm sure you're getting a great response.
ReplyDeletewonderful article. Very interesting to read this article. I would like to thank you for the efforts you had made for writing this awesome article. This article resolved my all queries.
ReplyDeletebilecik
ReplyDeletevan
elazığ
tokat
uşak
3DWE8
ığdır evden eve nakliyat
ReplyDeleteağrı evden eve nakliyat
maraş evden eve nakliyat
diyarbakır evden eve nakliyat
şırnak evden eve nakliyat
NVİ
1524D
ReplyDeleteTunceli Parça Eşya Taşıma
Kastamonu Parça Eşya Taşıma
Kırklareli Lojistik
Uşak Evden Eve Nakliyat
Antep Evden Eve Nakliyat
4F5BA
ReplyDeleteÇerkezköy Çelik Kapı
Mardin Şehir İçi Nakliyat
Nevşehir Evden Eve Nakliyat
Tekirdağ Parke Ustası
Kripto Para Borsaları
Aksaray Lojistik
Iğdır Lojistik
Muğla Lojistik
Tunceli Parça Eşya Taşıma
67F2D
ReplyDeleteProbit Güvenilir mi
Aksaray Şehirler Arası Nakliyat
Amasya Şehirler Arası Nakliyat
Mamak Fayans Ustası
Ankara Parke Ustası
Bilecik Evden Eve Nakliyat
Silivri Boya Ustası
Kayseri Lojistik
Çerkezköy Mutfak Dolabı
A2215
ReplyDeleteÇerkezköy Ekspertiz
Bartın Evden Eve Nakliyat
Rize Şehirler Arası Nakliyat
Artvin Şehirler Arası Nakliyat
Antep Şehirler Arası Nakliyat
Bayburt Parça Eşya Taşıma
Kilis Parça Eşya Taşıma
Mamak Parke Ustası
Sinop Parça Eşya Taşıma
E064A
ReplyDeleteburdur sesli sohbet
maraş telefonda rastgele sohbet
van görüntülü sohbet odaları
çanakkale sesli sohbet mobil
bayburt canlı sohbet siteleri
kızlarla rastgele sohbet
kırıkkale kadınlarla sohbet et
ordu muhabbet sohbet
bedava sohbet chat odaları
4EA2F
ReplyDeleteStar Atlas Coin Hangi Borsada
Instagram Beğeni Hilesi
Referans Kimliği Nedir
Qlc Coin Hangi Borsada
Trovo Takipçi Satın Al
Coin Kazma
Görüntülü Sohbet
Caw Coin Hangi Borsada
Telcoin Coin Hangi Borsada
21084
ReplyDeleteseo nedir
Eticaret Paketleri
Yapay Zeka Video Oluşturma
Tiktok Reklam Verme
Wordpress Temaları
Youtube Beğeni
Hosting Firmaları
fuar standı
güneş paneli fiyatları
86922
ReplyDeleteücretli şov canlı
3FBF9
ReplyDeletegörüntülü şov twitter
glassagram
ReplyDeleteallsmo
instagram gizli hesap görme
revelio
bestwhozi
OYVP3O