Tuesday, February 2, 2021

Spark Tuning -- Use Partition Discovery feature to do partition pruning

Goal:

This article explains how to use Partition Discovery feature to do partition pruning.

Solution:

If the data directories are organized using the same way that Hive partitions use, Spark can discover that partition column(s) using Partition Discovery feature

After that, the query on top of the partitioned table can do partition pruning.

Below is one example:

1. Create a DataFrame based on sample data and add a new duplicate column.

val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/data/retail-data/by-day/*.csv")
//Add a new column named "AnotherCountry" to have the same value as column "Country" so that we can compare the different query plan.
val newdf = df.withColumn("AnotherCountry", expr("Country"))

2.  Save the DataFrame as partitioned orc files.

val targetdir = "/tmp/test_partition_pruning/newdf"
newdf.write.mode("overwrite").format("orc").partitionBy("Country").save(targetdir)

3. Let's take a look at the target directory.

newdf.write.mode("overwrite").format("orc").partitionBy("Country").save(targetdir)
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.listStatus(new Path("/tmp/test_partition_pruning/newdf")).filter(_.isDir).map(_.getPath).foreach(println)

Output is:

maprfs:///tmp/test_partition_pruning/newdf/Country=Australia
maprfs:///tmp/test_partition_pruning/newdf/Country=Netherlands
maprfs:///tmp/test_partition_pruning/newdf/Country=Canada
maprfs:///tmp/test_partition_pruning/newdf/Country=Italy
maprfs:///tmp/test_partition_pruning/newdf/Country=Denmark
maprfs:///tmp/test_partition_pruning/newdf/Country=Iceland
...

4. "good" query uses partition pruning

val readdf = spark.read.format("orc").load(targetdir)
readdf.createOrReplaceTempView("readdf")

val goodsql = "SELECT * FROM readdf WHERE Country = 'Australia'"
val goodresult = spark.sql(goodsql)
goodresult.explain
println(s"Result: ${goodresult.count()} ")

Output:

== Physical Plan ==
*(1) FileScan orc [InvoiceNo#53,StockCode#54,Description#55,Quantity#56,InvoiceDate#57,UnitPrice#58,CustomerID#59,AnotherCountry#60,Country#61] Batched: true, Format: ORC, Location: InMemoryFileIndex[maprfs:///tmp/test_partition_pruning/newdf], PartitionCount: 1, PartitionFilters: [isnotnull(Country#61), (Country#61 = Australia)], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:int,InvoiceDate:timestamp,Un...

Result: 1259

5. "Bad" query can not use partition pruning

val badsql  = "SELECT * FROM readdf WHERE AnotherCountry = 'Australia'"   
val badresult = spark.sql(badsql)
badresult.explain
println(s"Result: ${badresult.count()} ")

Output:

== Physical Plan ==
*(1) Project [InvoiceNo#53, StockCode#54, Description#55, Quantity#56, InvoiceDate#57, UnitPrice#58, CustomerID#59, AnotherCountry#60, Country#61]
+- *(1) Filter (isnotnull(AnotherCountry#60) && (AnotherCountry#60 = Australia))
+- *(1) FileScan orc [InvoiceNo#53,StockCode#54,Description#55,Quantity#56,InvoiceDate#57,UnitPrice#58,CustomerID#59,AnotherCountry#60,Country#61] Batched: true, Format: ORC, Location: InMemoryFileIndex[maprfs:///tmp/test_partition_pruning/newdf], PartitionCount: 38, PartitionFilters: [], PushedFilters: [IsNotNull(AnotherCountry), EqualTo(AnotherCountry,Australia)], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:int,InvoiceDate:timestamp,Un...

Result: 1259

Analysis:

1. Explain plan

From above explain plans, it is pretty obvious why the "good" query uses partition pruning while the "bad" query does not -- column "Country" is the partition key.

The "good" query can actually push the "Filter" inside "FileScan" as "PartitionFilters".

So it only needs to scan 1 partition(directory):

PartitionCount: 1, PartitionFilters: [isnotnull(Country#61), (Country#61 = Australia)]

However the "bad" query has to scan all the 38 partitions(direcotries) firstly and then apply Filter:

PartitionCount: 38, PartitionFilters: []

2. Event log/Web UI

By only looking at the related Stage for the "good" query, the sum of input Size is only 80+KB while the sum of records = the final result = 1259.


 

 

 

 

By only looking at the related Stage for the "bad" query, the sum of input Size is 2+MB while the sum of records = the final result = 1259.


 

 

 

 

Of course, the execution time also has large differences. 

Note: Here is the Complete Sample Code.


No comments:

Post a Comment

Popular Posts