Monday, February 8, 2021

Spark Tuning -- Understand Cost Based Optimizer in Spark

Goal:

This article explains Spark CBO(Cost Based Optimizer) with examples and shares how to check the table statistics.

Env:

Spark 2.4.4

MapR 6.1

MySQL as backend database for Hive Metastore

Concept:

Like in any transitional RDBMS, CBO is to determine the best query execution plan based on table statistics. 

Starting from Spark 2.2, CBO was introduced. Before that, RBO(Rule Based Optimizer) is used.

Before using CBO, we need to collect the table/column level statistics(including histogram) using Analyze Table command.

  • Note: As of Spark 2.4.4, the CBO is disabled by default and the parameter spark.sql.cbo.enabled controls it.
  • Note: As of Spark 2.4.4, histogram statistics collection is disabled by default and the parameter spark.sql.statistics.histogram.enabled controls it. 
  • Note: Spark uses Equal-Height Histogram instead of Equal-Width Histogram.
  • Note: As of Spark 2.4.4, the default number of histogram buckets is 254 which is controlled by parameter spark.sql.statistics.histogram.numBins.

What is included for column level statistics?

  • For Numeric/Date/Timestamp type: Distinct Count, Max, Min, Null Count, Average Length, Max Length.
  • For String/Binary type: Distinct Count, Null Count, Average Length, Max Length.

CBO uses logical optimization rules to optimize the logical plan. 

So if we want to examine the statistics inside query explain plan, we can find them inside “Optimized Logical Plan” section.  

Solution:

Here we will use some simple query examples based on test table named "customer"(generated by TPC-DS tool shared in this post) to demonstrate the CBO and statistics in Spark.

All below SQL statements are executed in spark-sql by default.

1. Collect Table/Column statistics

1.1 Table level statistics including total number of rows and data size:

ANALYZE TABLE customer COMPUTE STATISTICS;

1.2 Table + Column statistics: 

ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk;

1.3 Table + Column statistics with histogram:

set spark.sql.statistics.histogram.enabled=true;
ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk;

2. View Table/Column statistics

2.1 Table level statistics:

DESCRIBE EXTENDED customer;

Output:

Statistics	26670841 bytes, 500000 rows

2.2 Column level statistics including histogram:

DESCRIBE EXTENDED customer c_customer_sk;

Output:

col_name	c_customer_sk
data_type int
comment NULL
min 1
max 500000
num_nulls 0
distinct_count 500000
avg_col_len 4
max_col_len 4
histogram height: 1968.5039370078741, num_of_bins: 254
bin_0 lower_bound: 1.0, upper_bound: 1954.0, distinct_count: 1982
bin_1 lower_bound: 1954.0, upper_bound: 3898.0, distinct_count: 1893
...
bin_253 lower_bound: 497982.0, upper_bound: 500000.0, distinct_count: 2076

2.3 Check statistics in backend database for Hive Metastore:(eg. mysql)

select tp.PARAM_KEY, tp.PARAM_VALUE
from DBS d,TBLS t, TABLE_PARAMS tp
where t.DB_ID = d.DB_ID
and tp.TBL_ID=t.TBL_ID
and d.NAME='tpcds' and t.TBL_NAME='customer'
and (
tp.PARAM_KEY in (
'spark.sql.statistics.numRows',
'spark.sql.statistics.totalSize'
)
or
tp.PARAM_KEY like 'spark.sql.statistics.colStats.c_customer_sk.%'
)
and tp.PARAM_KEY not like 'spark.sql.statistics.colStats.%.histogram'
;

Output:

+-----------------------------------------------------------+-------------+
| PARAM_KEY | PARAM_VALUE |
+-----------------------------------------------------------+-------------+
| spark.sql.statistics.colStats.c_customer_sk.avgLen | 4 |
| spark.sql.statistics.colStats.c_customer_sk.distinctCount | 500000 |
| spark.sql.statistics.colStats.c_customer_sk.max | 500000 |
| spark.sql.statistics.colStats.c_customer_sk.maxLen | 4 |
| spark.sql.statistics.colStats.c_customer_sk.min | 1 |
| spark.sql.statistics.colStats.c_customer_sk.nullCount | 0 |
| spark.sql.statistics.colStats.c_customer_sk.version | 1 |
| spark.sql.statistics.numRows | 500000 |
| spark.sql.statistics.totalSize | 26670841 |
+-----------------------------------------------------------+-------------+
9 rows in set (0.00 sec)

2.4  View statistics in spark-shell to understand which classes are used to store statistics

val db = "tpcds"
val tableName = "customer"
val colName = "c_customer_sk"

val metadata = spark.sharedState.externalCatalog.getTable(db, tableName)
val stats = metadata.stats.get
val colStats = stats.colStats
val c_customer_sk_stats = colStats(colName)

val props = c_customer_sk_stats.toMap(colName)
println(props)

Output:

scala> println(props)
Map(c_customer_sk.avgLen -> 4, c_customer_sk.nullCount -> 0, c_customer_sk.distinctCount -> 500000, c_customer_sk.histogram -> XXXYYYZZZ, c_customer_sk.min -> 1, c_customer_sk.max -> 500000, c_customer_sk.version -> 1, c_customer_sk.maxLen -> 4)

Basically above "c_customer_sk_stats" is of class org.apache.spark.sql.catalyst.catalog.CatalogColumnStat which is defined inside ./sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala

3. Check cardinality based on statistics

From above statistics for column "c_customer_sk" in table "customer", we know that this column is unique and has totally 500000 distinct values ranging from 1 ~ 500000.

In RBO world, no matter the filter is based on "where c_customer_sk < 500" or "where c_customer_sk < 500000", the Filter operator always shows "sizeInBytes=119.2 MB" which is the total table size. And there is no rowCount shown.

spark-sql> set spark.sql.cbo.enabled=false;
spark.sql.cbo.enabled false
Time taken: 0.013 seconds, Fetched 1 row(s)

spark-sql> explain cost select c_customer_sk from customer where c_customer_sk < 500;
== Optimized Logical Plan ==
Project [c_customer_sk#724], Statistics(sizeInBytes=6.4 MB, hints=none)
+- Filter (isnotnull(c_customer_sk#724) && (c_customer_sk#724 < 500)), Statistics(sizeInBytes=119.2 MB, hints=none)
+- Relation[c_customer_sk#724,c_customer_id#725,c_current_cdemo_sk#726,c_current_hdemo_sk#727,c_current_addr_sk#728,c_first_shipto_date_sk#729,c_first_sales_date_sk#730,c_salutation#731,c_first_name#732,c_last_name#733,c_preferred_cust_flag#734,c_birth_day#735,c_birth_month#736,c_birth_year#737,c_birth_country#738,c_login#739,c_email_address#740,c_last_review_date#741] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)

spark-sql> explain cost select c_customer_sk from customer where c_customer_sk < 500000;
== Optimized Logical Plan ==
Project [c_customer_sk#724], Statistics(sizeInBytes=6.4 MB, hints=none)
+- Filter (isnotnull(c_customer_sk#724) && (c_customer_sk#724 < 500000)), Statistics(sizeInBytes=119.2 MB, hints=none)
+- Relation[c_customer_sk#724,c_customer_id#725,c_current_cdemo_sk#726,c_current_hdemo_sk#727,c_current_addr_sk#728,c_first_shipto_date_sk#729,c_first_sales_date_sk#730,c_salutation#731,c_first_name#732,c_last_name#733,c_preferred_cust_flag#734,c_birth_day#735,c_birth_month#736,c_birth_year#737,c_birth_country#738,c_login#739,c_email_address#740,c_last_review_date#741] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)

 In CBO world, we can see the estimated data size and rowCount based on column level statistics and the Filter: 

sizeInBytes=122.8 KB, rowCount=503 VS sizeInBytes=119.2 MB, rowCount=5.00E+5.

spark-sql> set spark.sql.cbo.enabled=true;
spark.sql.cbo.enabled true
Time taken: 0.02 seconds, Fetched 1 row(s)

spark-sql> explain cost select c_customer_sk from customer where c_customer_sk < 500;
== Optimized Logical Plan ==
Project [c_customer_sk#1024], Statistics(sizeInBytes=5.9 KB, rowCount=503, hints=none)
+- Filter (isnotnull(c_customer_sk#1024) && (c_customer_sk#1024 < 500)), Statistics(sizeInBytes=122.8 KB, rowCount=503, hints=none)
+- Relation[c_customer_sk#1024,c_customer_id#1025,c_current_cdemo_sk#1026,c_current_hdemo_sk#1027,c_current_addr_sk#1028,c_first_shipto_date_sk#1029,c_first_sales_date_sk#1030,c_salutation#1031,c_first_name#1032,c_last_name#1033,c_preferred_cust_flag#1034,c_birth_day#1035,c_birth_month#1036,c_birth_year#1037,c_birth_country#1038,c_login#1039,c_email_address#1040,c_last_review_date#1041] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)

spark-sql> explain cost select c_customer_sk from customer where c_customer_sk < 500000;
== Optimized Logical Plan ==
Project [c_customer_sk#1024], Statistics(sizeInBytes=5.7 MB, rowCount=5.00E+5, hints=none)
+- Filter (isnotnull(c_customer_sk#1024) && (c_customer_sk#1024 < 500000)), Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Relation[c_customer_sk#1024,c_customer_id#1025,c_current_cdemo_sk#1026,c_current_hdemo_sk#1027,c_current_addr_sk#1028,c_first_shipto_date_sk#1029,c_first_sales_date_sk#1030,c_salutation#1031,c_first_name#1032,c_last_name#1033,c_preferred_cust_flag#1034,c_birth_day#1035,c_birth_month#1036,c_birth_year#1037,c_birth_country#1038,c_login#1039,c_email_address#1040,c_last_review_date#1041] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)

Note: In spark-shell, we can use below way to fetch the same stuff:

spark.conf.set("spark.sql.cbo.enabled","true")
sql(s"use tpcds")
val stats = spark.sql("select c_customer_sk from customer where c_customer_sk < 500").queryExecution.stringWithStats
More information regarding how Spark CBO calculates the cardinality for Filter/Join/other operators, please refer to this slide and training session:

4. Broadcast Join

Like in any MPP architecture query engine or SQL on Hadoop products(Such as Hive, Impala, Drill), Broadcast Join is not a new thing.  

By default in Spark, the table/data size below 10MB(configured by spark.sql.autoBroadcastJoinThreshold) can be broadcasted to all worker nodes.

 Look at below example join query:

spark-sql> explain cost select * from customer a, customer b where a.c_first_name=b.c_first_name and a.c_customer_sk<500000 and b.c_customer_sk<500;
== Optimized Logical Plan ==
Join Inner, (c_first_name#18 = c_first_name#62), Statistics(sizeInBytes=22.5 MB, rowCount=4.80E+4, hints=none)
:- Filter ((isnotnull(c_customer_sk#10) && (c_customer_sk#10 < 500000)) && isnotnull(c_first_name#18)), Statistics(sizeInBytes=115.1 MB, rowCount=4.83E+5, hints=none)
: +- Relation[c_customer_sk#10,c_customer_id#11,c_current_cdemo_sk#12,c_current_hdemo_sk#13,c_current_addr_sk#14,c_first_shipto_date_sk#15,c_first_sales_date_sk#16,c_salutation#17,c_first_name#18,c_last_name#19,c_preferred_cust_flag#20,c_birth_day#21,c_birth_month#22,c_birth_year#23,c_birth_country#24,c_login#25,c_email_address#26,c_last_review_date#27] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Filter ((isnotnull(c_customer_sk#54) && (c_customer_sk#54 < 500)) && isnotnull(c_first_name#62)), Statistics(sizeInBytes=118.7 KB, rowCount=486, hints=none)
+- Relation[c_customer_sk#54,c_customer_id#55,c_current_cdemo_sk#56,c_current_hdemo_sk#57,c_current_addr_sk#58,c_first_shipto_date_sk#59,c_first_sales_date_sk#60,c_salutation#61,c_first_name#62,c_last_name#63,c_preferred_cust_flag#64,c_birth_day#65,c_birth_month#66,c_birth_year#67,c_birth_country#68,c_login#69,c_email_address#70,c_last_review_date#71] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)

== Physical Plan ==
*(2) BroadcastHashJoin [c_first_name#18], [c_first_name#62], Inner, BuildRight
:- *(2) Project [c_customer_sk#10, c_customer_id#11, c_current_cdemo_sk#12, c_current_hdemo_sk#13, c_current_addr_sk#14, c_first_shipto_date_sk#15, c_first_sales_date_sk#16, c_salutation#17, c_first_name#18, c_last_name#19, c_preferred_cust_flag#20, c_birth_day#21, c_birth_month#22, c_birth_year#23, c_birth_country#24, c_login#25, c_email_address#26, c_last_review_date#27]
: +- *(2) Filter ((isnotnull(c_customer_sk#10) && (c_customer_sk#10 < 500000)) && isnotnull(c_first_name#18))
: +- *(2) FileScan parquet tpcds.customer[c_customer_sk#10,c_customer_id#11,c_current_cdemo_sk#12,c_current_hdemo_sk#13,c_current_addr_sk#14,c_first_shipto_date_sk#15,c_first_sales_date_sk#16,c_salutation#17,c_first_name#18,c_last_name#19,c_preferred_cust_flag#20,c_birth_day#21,c_birth_month#22,c_birth_year#23,c_birth_country#24,c_login#25,c_email_address#26,c_last_review_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), LessThan(c_customer_sk,500000), IsNotNull(c_first_name)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[8, string, true]))
+- *(1) Project [c_customer_sk#54, c_customer_id#55, c_current_cdemo_sk#56, c_current_hdemo_sk#57, c_current_addr_sk#58, c_first_shipto_date_sk#59, c_first_sales_date_sk#60, c_salutation#61, c_first_name#62, c_last_name#63, c_preferred_cust_flag#64, c_birth_day#65, c_birth_month#66, c_birth_year#67, c_birth_country#68, c_login#69, c_email_address#70, c_last_review_date#71]
+- *(1) Filter ((isnotnull(c_customer_sk#54) && (c_customer_sk#54 < 500)) && isnotnull(c_first_name#62))
+- *(1) FileScan parquet tpcds.customer[c_customer_sk#54,c_customer_id#55,c_current_cdemo_sk#56,c_current_hdemo_sk#57,c_current_addr_sk#58,c_first_shipto_date_sk#59,c_first_sales_date_sk#60,c_salutation#61,c_first_name#62,c_last_name#63,c_preferred_cust_flag#64,c_birth_day#65,c_birth_month#66,c_birth_year#67,c_birth_country#68,c_login#69,c_email_address#70,c_last_review_date#71] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), LessThan(c_customer_sk,500), IsNotNull(c_first_name)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
Time taken: 0.098 seconds, Fetched 1 row(s)

From the "Optimized Logical Plan", we know the estimated size of the smaller table is (sizeInBytes=118.7 KB, rowCount=486). So it can be broadcasted and that is why we see "BroadcastHashJoin" in "Physical Plan".

If we decrease spark.sql.autoBroadcastJoinThreshold to 118KB(118*1024=120832), then it will be converted to a SortMergeJoin:

spark-sql> set spark.sql.autoBroadcastJoinThreshold=120832;
spark.sql.autoBroadcastJoinThreshold 120832
Time taken: 0.016 seconds, Fetched 1 row(s)
spark-sql> explain cost select * from customer a, customer b where a.c_first_name=b.c_first_name and a.c_customer_sk<500000 and b.c_customer_sk<500;
== Optimized Logical Plan ==
Join Inner, (c_first_name#18 = c_first_name#94), Statistics(sizeInBytes=22.5 MB, rowCount=4.80E+4, hints=none)
:- Filter ((isnotnull(c_customer_sk#10) && (c_customer_sk#10 < 500000)) && isnotnull(c_first_name#18)), Statistics(sizeInBytes=115.1 MB, rowCount=4.83E+5, hints=none)
: +- Relation[c_customer_sk#10,c_customer_id#11,c_current_cdemo_sk#12,c_current_hdemo_sk#13,c_current_addr_sk#14,c_first_shipto_date_sk#15,c_first_sales_date_sk#16,c_salutation#17,c_first_name#18,c_last_name#19,c_preferred_cust_flag#20,c_birth_day#21,c_birth_month#22,c_birth_year#23,c_birth_country#24,c_login#25,c_email_address#26,c_last_review_date#27] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Filter ((isnotnull(c_customer_sk#86) && (c_customer_sk#86 < 500)) && isnotnull(c_first_name#94)), Statistics(sizeInBytes=118.7 KB, rowCount=486, hints=none)
+- Relation[c_customer_sk#86,c_customer_id#87,c_current_cdemo_sk#88,c_current_hdemo_sk#89,c_current_addr_sk#90,c_first_shipto_date_sk#91,c_first_sales_date_sk#92,c_salutation#93,c_first_name#94,c_last_name#95,c_preferred_cust_flag#96,c_birth_day#97,c_birth_month#98,c_birth_year#99,c_birth_country#100,c_login#101,c_email_address#102,c_last_review_date#103] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)

== Physical Plan ==
*(5) SortMergeJoin [c_first_name#18], [c_first_name#94], Inner
:- *(2) Sort [c_first_name#18 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_first_name#18, 200)
: +- *(1) Project [c_customer_sk#10, c_customer_id#11, c_current_cdemo_sk#12, c_current_hdemo_sk#13, c_current_addr_sk#14, c_first_shipto_date_sk#15, c_first_sales_date_sk#16, c_salutation#17, c_first_name#18, c_last_name#19, c_preferred_cust_flag#20, c_birth_day#21, c_birth_month#22, c_birth_year#23, c_birth_country#24, c_login#25, c_email_address#26, c_last_review_date#27]
: +- *(1) Filter ((isnotnull(c_customer_sk#10) && (c_customer_sk#10 < 500000)) && isnotnull(c_first_name#18))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#10,c_customer_id#11,c_current_cdemo_sk#12,c_current_hdemo_sk#13,c_current_addr_sk#14,c_first_shipto_date_sk#15,c_first_sales_date_sk#16,c_salutation#17,c_first_name#18,c_last_name#19,c_preferred_cust_flag#20,c_birth_day#21,c_birth_month#22,c_birth_year#23,c_birth_country#24,c_login#25,c_email_address#26,c_last_review_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), LessThan(c_customer_sk,500000), IsNotNull(c_first_name)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
+- *(4) Sort [c_first_name#94 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c_first_name#94, 200)
+- *(3) Project [c_customer_sk#86, c_customer_id#87, c_current_cdemo_sk#88, c_current_hdemo_sk#89, c_current_addr_sk#90, c_first_shipto_date_sk#91, c_first_sales_date_sk#92, c_salutation#93, c_first_name#94, c_last_name#95, c_preferred_cust_flag#96, c_birth_day#97, c_birth_month#98, c_birth_year#99, c_birth_country#100, c_login#101, c_email_address#102, c_last_review_date#103]
+- *(3) Filter ((isnotnull(c_customer_sk#86) && (c_customer_sk#86 < 500)) && isnotnull(c_first_name#94))
+- *(3) FileScan parquet tpcds.customer[c_customer_sk#86,c_customer_id#87,c_current_cdemo_sk#88,c_current_hdemo_sk#89,c_current_addr_sk#90,c_first_shipto_date_sk#91,c_first_sales_date_sk#92,c_salutation#93,c_first_name#94,c_last_name#95,c_preferred_cust_flag#96,c_birth_day#97,c_birth_month#98,c_birth_year#99,c_birth_country#100,c_login#101,c_email_address#102,c_last_review_date#103] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), LessThan(c_customer_sk,500), IsNotNull(c_first_name)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
Time taken: 0.113 seconds, Fetched 1 row(s)

If we run this query "select * from customer a, customer b where a.c_first_name=b.c_first_name and a.c_customer_sk<5 and b.c_customer_sk<500;" and look at the web UI, we can also find the estimated cardinality:

 In all, CBO is a huge topic in any database/query engine. I will discuss more in future posts.

References:

Cost Based Optimizer in Apache Spark 2.2 

Training: Cardinality Estimation through Histogram in Apache Spark 2.

 

No comments:

Post a Comment

Popular Posts