Sunday, April 4, 2021

How to enable GpuKryoRegistrator on RAPIDS Accelerator for Spark

Goal:

This article shares the steps to enable GpuKryoRegistrator on RAPIDS Accelerator for Spark.

Env:

Spark 3.1.1

RAPIDS Accelerator for Apache Spark 0.4.1

Solution:

As mentioned in Spark Tuning Doc:

  • Java serialization: By default, Spark serializes objects using Java’s ObjectOutputStream framework, and can work with any class you create that implements java.io.Serializable. You can also control the performance of your serialization more closely by extending java.io.Externalizable. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.
  • Kryo serialization: Spark can also use the Kryo library (version 4) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.

In Rapids Accelerator, it also has a class named com.nvidia.spark.rapids.GpuKryoRegistrator to use Kryo to register below classes in org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExec :

  • SerializeConcatHostBuffersDeserializeBatch
  • SerializeBatchDeserializeHostBuffer 

How to enable?

Set below 2 parameters(eg, in spark-defaults.conf):

spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator com.nvidia.spark.rapids.GpuKryoRegistrator

Common Issues

This is a common issue in Kryo serialization : Buffer overflow.

For example, when running Q7 of TPCDS/NDS, it may fail with:

Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 636
at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
at com.esotericsoftware.kryo.io.Output.write(Output.java:219)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1859)
at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:712)
at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at ai.rapids.cudf.JCudfSerialization$DataOutputStreamWriter.copyDataFrom(JCudfSerialization.java:600)
at ai.rapids.cudf.JCudfSerialization$DataWriter.copyDataFrom(JCudfSerialization.java:546)
at ai.rapids.cudf.JCudfSerialization.copySlicedAndPad(JCudfSerialization.java:1104)
at ai.rapids.cudf.JCudfSerialization.copySlicedOffsets(JCudfSerialization.java:1332)
at ai.rapids.cudf.JCudfSerialization.writeSliced(JCudfSerialization.java:1464)
at ai.rapids.cudf.JCudfSerialization.writeSliced(JCudfSerialization.java:1517)
at ai.rapids.cudf.JCudfSerialization.writeToStream(JCudfSerialization.java:1567)
at org.apache.spark.sql.rapids.execution.SerializeBatchDeserializeHostBuffer.writeObject(GpuBroadcastExchangeExec.scala:153)
at jdk.internal.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at com.esotericsoftware.kryo.serializers.JavaSerializer.write(JavaSerializer.java:51)
... 9 more

The fix is to increase the spark.kryoserializer.buffer.max from default 64M to bigger, say 512M:

spark.kryoserializer.buffer.max 512m


 

 

 

No comments:

Post a Comment

Popular Posts