Tuesday, February 2, 2021

Spark Tuning -- Column Projection for Parquet

Goal:

This article explains the column projection for parquet format(or other columnar format) in Spark.

Solution:

Spark can do column projection for columnar format data such as Parquet.

The idea is to only read the needed columns instead of reading all of the columns.

This can reduce lots of I/O needed to improve the performance.

Below is one example.

Note:  To show difference of performance for column projection,  I disabled Parquet filter pushdown feature by setting spark.sql.parquet.filterPushdown=false in my configuration. 

I will discuss about Parquet filter pushdown feature in another article.

1.  Save a sample DataFrame as parquet files.

val df = spark.read.json("/data/activity-data/")
val targetdir = "/tmp/test_column_projection/newdf"
df.write.mode("overwrite").format("parquet").save(targetdir)

2. Select only 1 column

val somecols  = "SELECT Device FROM readdf WHERE Model='something_not_exist'"
val goodresult = spark.sql(somecols)
goodresult.explain
goodresult.collect

Output:

scala> goodresult.explain
== Physical Plan ==
*(1) Project [Device#48]
+- *(1) Filter (isnotnull(Model#50) && (Model#50 = something_not_exist))
+- *(1) FileScan parquet [Device#48,Model#50] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Model), EqualTo(Model,something_not_exist)], ReadSchema: struct<Device:string,Model:string>

scala> goodresult.collect
res5: Array[org.apache.spark.sql.Row] = Array()

3. Select ALL columns

val allcols = "SELECT * FROM readdf where Model='something_not_exist'"
val badresult = spark.sql(allcols)
badresult.explain
badresult.collect

 Output:

scala> badresult.explain
== Physical Plan ==
*(1) Project [Arrival_Time#46L, Creation_Time#47L, Device#48, Index#49L, Model#50, User#51, gt#52, x#53, y#54, z#55]
+- *(1) Filter (isnotnull(Model#50) && (Model#50 = something_not_exist))
+- *(1) FileScan parquet [Arrival_Time#46L,Creation_Time#47L,Device#48,Index#49L,Model#50,User#51,gt#52,x#53,y#54,z#55] Batched: true, Format: Parquet, Location: InMemoryFileIndex[maprfs:///tmp/test_column_projection/newdf], PartitionFilters: [], PushedFilters: [IsNotNull(Model), EqualTo(Model,something_not_exist)], ReadSchema: struct<Arrival_Time:bigint,Creation_Time:bigint,Device:string,Index:bigint,Model:string,User:stri...

scala> badresult.collect
res7: Array[org.apache.spark.sql.Row] = Array()

Analysis:

1. Explain plan

During FileScan, we can see only the needed columns are read due to column projection feature:

FileScan parquet [Device#48,Model#50]

vs

FileScan parquet [Arrival_Time#46L,Creation_Time#47L,Device#48,Index#49L,Model#50,User#51,gt#52,x#53,y#54,z#55]

2. Event log/Web UI

The "SELECT only 1 column"'s stage shows sum of Input Size=868.3KB.

 


 

 

 

 

The "SELECT ALL columns"'s stage shows sum of Input Size=142.3MB.   

 


 

 

 

 

 Note: Here is the Complete Sample Code.


No comments:

Post a Comment

Popular Posts