Tuesday, February 16, 2021

Spark Tuning -- explaining Spark SQL Join Types

Goal:

This article explains the different types of joins in Spark SQL using sample query and explain plan. 

We will talk about each use case, code logic for join selection and hints.

Env:

Spark 2.4.4

Concept:

Most of the MPP databases or distributed query engines have the similar design for joins in SQL. 

The most popular join types are Shuffle Join and Broadcast Join

They may have different names in different products. For example, in Hive, the Shuffle join is also called Common Join while Broadcast Join is also called Map Join.

I have explained those stuff for different products in this blog. Please refer to below posts for different products:

In this post, we will discuss about below join types in Spark SQL:

  1. Broadcast Hash Join (BHJ)
  2. Shuffle Hash Join (SHJ)
  3. Sort Merge Join (SMJ)
  4. Broadcast Nested Loop Join (BNLJ)
  5. Cartesian Product (CP)

Solution:

1. Broadcast Hash Join (BHJ)

Use case:

Large table joins a small table. For example, a Fact Table joins a Dimension Table.

Principle:

Both Broadcast Hash Join and Shuffle Hash Join belong to Hash Join. The local principle is the same.

The name "Broadcast" and "Shuffle" just tells how the data is transferred in a distributed environment.

So Let's understand how Hash Join works firstly.

Firstly Hash Join algorithm will determines the Build Table and Probe Table. Normally the smaller table is the Build Table, and the larger table is the Probe Table.

Then Build Table will be constructed into a Hash Table in memory based on the hash function on the join key. The Hash Table can be stored on disk if needed.

Finally scan the Probe Table and apply the same hash function on the join key to find the matched rows in the Hash Table.

  • Note: Above Hash Join algorithm applies to both Broadcast Hash Join and Shuffle Hash Join.

In a distributed environment, Broadcast Hash Join firstly broadcast the small table to each Executor Node through Driver. And then do the local hash join with the partition of big table.

Since the broadcasted small table is through Driver, so the Driver's memory should be large enough. 

Since the broadcasted small table will be built as Hash Table in memory in each Executor, so the Executor's memory should also be large enough.

As per the BroadcastExchangeExec.scala, the max size of the table which can be broadcasted is 8GB or 512 million rows:

if (numRows >= 512000000) {
throw new SparkException(
s"Cannot broadcast the table with more than 512 millions rows: $numRows rows")

......

if (dataSize >= (8L << 30)) {
throw new SparkException(
s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
}

By default in Spark, the table/data size below 10MB(configured by spark.sql.autoBroadcastJoinThreshold) can be broadcasted.

Hints:

  • BROADCAST
  • BROADCASTJOIN
  • MAPJOIN

Join Selection:

Say in CBO, for a simple join query, how do we know which join type will be chosen? 

For example, the choice could be between Broadcast Hash Join, Shuffle Hash Join and Sort Merge Join.

The logic is explained inside SparkStrategies.scala:

* - Broadcast hash join (BHJ):
* BHJ is not supported for full outer join. For right outer join, we only can broadcast the
* left side. For left outer, left semi, left anti and the internal join type ExistenceJoin,
* we only can broadcast the right side. For inner like join, we can broadcast both sides.
* Normally, BHJ can perform faster than the other join algorithms when the broadcast side is
* small. However, broadcasting tables is a network-intensive operation. It could cause OOM
* or perform worse than the other join algorithms, especially when the build/broadcast side
* is big.
*
* For the supported cases, users can specify the broadcast hint (e.g. the user applied the
* [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and session-based
* [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is used and
* which join side is broadcast.
*
* 1) Broadcast the join side with the broadcast hint, even if the size is larger than
* [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only when the type
* is inner like join), the side with a smaller estimated physical size will be broadcast.
* 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side
* whose estimated physical size is smaller than the threshold. If both sides are below the
* threshold, broadcast the smaller side. If neither is smaller, BHJ is not used.

 The code logic is:

      // --- BroadcastHashJoin --------------------------------------------------------------------

// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

Test:

Here we will use TPC-DS sample data share by this post .

a. Broadcast Hash Join is chosen because the smaller side's(after Filter on "customer" table) estimation is sizeInBytes=16.0 B which is below default value of spark.sql.autoBroadcastJoinThreshold -- 10MB.

set spark.sql.cbo.enabled=true;
set spark.sql.statistics.histogram.enabled=true;
use tpcds;

explain cost select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_sk = s.ss_customer_sk
and c.c_customer_id = "123";

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#11], Statistics(sizeInBytes=928.0 B, rowCount=58, hints=none)
+- Join Inner, (c_customer_sk#14 = ss_customer_sk#34), Statistics(sizeInBytes=1392.0 B, rowCount=58, hints=none)
:- Project [c_customer_sk#14, c_current_addr_sk#18], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
: +- Filter ((isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)) && isnotnull(c_customer_sk#14)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#14,c_customer_id#15,c_current_cdemo_sk#16,c_current_hdemo_sk#17,c_current_addr_sk#18,c_first_shipto_date_sk#19,c_first_sales_date_sk#20,c_salutation#21,c_first_name#22,c_last_name#23,c_preferred_cust_flag#24,c_birth_day#25,c_birth_month#26,c_birth_year#27,c_birth_country#28,c_login#29,c_email_address#30,c_last_review_date#31] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#32, ss_customer_sk#34], Statistics(sizeInBytes=419.7 MB, rowCount=2.75E+7, hints=none)
+- Filter isnotnull(ss_customer_sk#34), Statistics(sizeInBytes=3.9 GB, rowCount=2.75E+7, hints=none)
+- Relation[ss_sold_time_sk#32,ss_item_sk#33,ss_customer_sk#34,ss_cdemo_sk#35,ss_hdemo_sk#36,ss_addr_sk#37,ss_store_sk#38,ss_promo_sk#39,ss_ticket_number#40L,ss_quantity#41,ss_wholesale_cost#42,ss_list_price#43,ss_sales_price#44,ss_ext_discount_amt#45,ss_ext_sales_price#46,ss_ext_wholesale_cost#47,ss_ext_list_price#48,ss_ext_tax#49,ss_coupon_amt#50,ss_net_paid#51,ss_net_paid_inc_tax#52,ss_net_profit#53,ss_sold_date_sk#54] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(2) Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#11]
+- *(2) BroadcastHashJoin [c_customer_sk#14], [ss_customer_sk#34], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
: +- *(1) Project [c_customer_sk#14, c_current_addr_sk#18]
: +- *(1) Filter ((isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)) && isnotnull(c_customer_sk#14))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#14,c_customer_id#15,c_current_addr_sk#18] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123), IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_addr_sk:int>
+- *(2) Project [ss_sold_time_sk#32, ss_customer_sk#34]
+- *(2) Filter isnotnull(ss_customer_sk#34)
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#32,ss_customer_sk#34,ss_sold_date_sk#54] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_customer_sk)], ReadSchema: struct<ss_sold_time_sk:int,ss_customer_sk:int>

b. Broadcast Hash Join is chosen because even after we decreased spark.sql.autoBroadcastJoinThreshold to 15 bytes , but the broadcast hint takes precedence.

set spark.sql.autoBroadcastJoinThreshold=15;

explain cost select /*+ BROADCAST(c) */ c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_sk = s.ss_customer_sk
and c.c_customer_id = "123";

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#69], Statistics(sizeInBytes=928.0 B, rowCount=58, hints=none)
+- Join Inner, (c_customer_sk#14 = ss_customer_sk#34), Statistics(sizeInBytes=1392.0 B, rowCount=58, hints=none)
:- ResolvedHint (broadcast), Statistics(sizeInBytes=16.0 B, rowCount=1, hints=(broadcast))
: +- Project [c_customer_sk#14, c_current_addr_sk#18], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
: +- Filter ((isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)) && isnotnull(c_customer_sk#14)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#14,c_customer_id#15,c_current_cdemo_sk#16,c_current_hdemo_sk#17,c_current_addr_sk#18,c_first_shipto_date_sk#19,c_first_sales_date_sk#20,c_salutation#21,c_first_name#22,c_last_name#23,c_preferred_cust_flag#24,c_birth_day#25,c_birth_month#26,c_birth_year#27,c_birth_country#28,c_login#29,c_email_address#30,c_last_review_date#31] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#32, ss_customer_sk#34], Statistics(sizeInBytes=419.7 MB, rowCount=2.75E+7, hints=none)
+- Filter isnotnull(ss_customer_sk#34), Statistics(sizeInBytes=3.9 GB, rowCount=2.75E+7, hints=none)
+- Relation[ss_sold_time_sk#32,ss_item_sk#33,ss_customer_sk#34,ss_cdemo_sk#35,ss_hdemo_sk#36,ss_addr_sk#37,ss_store_sk#38,ss_promo_sk#39,ss_ticket_number#40L,ss_quantity#41,ss_wholesale_cost#42,ss_list_price#43,ss_sales_price#44,ss_ext_discount_amt#45,ss_ext_sales_price#46,ss_ext_wholesale_cost#47,ss_ext_list_price#48,ss_ext_tax#49,ss_coupon_amt#50,ss_net_paid#51,ss_net_paid_inc_tax#52,ss_net_profit#53,ss_sold_date_sk#54] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(2) Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#69]
+- *(2) BroadcastHashJoin [c_customer_sk#14], [ss_customer_sk#34], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
: +- *(1) Project [c_customer_sk#14, c_current_addr_sk#18]
: +- *(1) Filter ((isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)) && isnotnull(c_customer_sk#14))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#14,c_customer_id#15,c_current_addr_sk#18] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123), IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_addr_sk:int>
+- *(2) Project [ss_sold_time_sk#32, ss_customer_sk#34]
+- *(2) Filter isnotnull(ss_customer_sk#34)
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#32,ss_customer_sk#34,ss_sold_date_sk#54] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_customer_sk)], ReadSchema: struct<ss_sold_time_sk:int,ss_customer_sk:int>

Note the key words "ResolvedHint (broadcast)" in the Optimized Logical Plan.

c. Broadcast Hash Join is chosen to broadcast the smaller side(after Filter on "customer" table) after hints are detected on both sides.

set spark.sql.autoBroadcastJoinThreshold=15;

explain cost select /*+ MAPJOIN(c),BROADCASTJOIN(s) */ c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_sk = s.ss_customer_sk
and c.c_customer_id = "123";

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#87], Statistics(sizeInBytes=928.0 B, rowCount=58, hints=none)
+- Join Inner, (c_customer_sk#14 = ss_customer_sk#34), Statistics(sizeInBytes=1392.0 B, rowCount=58, hints=none)
:- ResolvedHint (broadcast), Statistics(sizeInBytes=16.0 B, rowCount=1, hints=(broadcast))
: +- Project [c_customer_sk#14, c_current_addr_sk#18], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
: +- Filter ((isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)) && isnotnull(c_customer_sk#14)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#14,c_customer_id#15,c_current_cdemo_sk#16,c_current_hdemo_sk#17,c_current_addr_sk#18,c_first_shipto_date_sk#19,c_first_sales_date_sk#20,c_salutation#21,c_first_name#22,c_last_name#23,c_preferred_cust_flag#24,c_birth_day#25,c_birth_month#26,c_birth_year#27,c_birth_country#28,c_login#29,c_email_address#30,c_last_review_date#31] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- ResolvedHint (broadcast), Statistics(sizeInBytes=419.7 MB, rowCount=2.75E+7, hints=(broadcast))
+- Project [ss_sold_time_sk#32, ss_customer_sk#34], Statistics(sizeInBytes=419.7 MB, rowCount=2.75E+7, hints=none)
+- Filter isnotnull(ss_customer_sk#34), Statistics(sizeInBytes=3.9 GB, rowCount=2.75E+7, hints=none)
+- Relation[ss_sold_time_sk#32,ss_item_sk#33,ss_customer_sk#34,ss_cdemo_sk#35,ss_hdemo_sk#36,ss_addr_sk#37,ss_store_sk#38,ss_promo_sk#39,ss_ticket_number#40L,ss_quantity#41,ss_wholesale_cost#42,ss_list_price#43,ss_sales_price#44,ss_ext_discount_amt#45,ss_ext_sales_price#46,ss_ext_wholesale_cost#47,ss_ext_list_price#48,ss_ext_tax#49,ss_coupon_amt#50,ss_net_paid#51,ss_net_paid_inc_tax#52,ss_net_profit#53,ss_sold_date_sk#54] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(2) Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#87]
+- *(2) BroadcastHashJoin [c_customer_sk#14], [ss_customer_sk#34], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
: +- *(1) Project [c_customer_sk#14, c_current_addr_sk#18]
: +- *(1) Filter ((isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)) && isnotnull(c_customer_sk#14))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#14,c_customer_id#15,c_current_addr_sk#18] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123), IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_addr_sk:int>
+- *(2) Project [ss_sold_time_sk#32, ss_customer_sk#34]
+- *(2) Filter isnotnull(ss_customer_sk#34)
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#32,ss_customer_sk#34,ss_sold_date_sk#54] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_customer_sk)], ReadSchema: struct<ss_sold_time_sk:int,ss_customer_sk:int>

Note the key words "ResolvedHint (broadcast)" in the Optimized Logical Plan are detected on both sides. However it only broadcasts the smaller side.

Note that here we used different names of the same hint. It does not matter.

Query Plan in Web UI:



2. Shuffle Hash Join (SHJ)

Use case:

Large table joins a large table but one join side is significantly smaller than the other join side.

Principle:

It firstly shuffles both tables use the same hash function on the join key to make sure rows in the same join key are in the same partition. 

Since above Hash Join algorithm also applies to Shuffle Hash Join, the smaller table's partition will become the so-called Build Table. And it will be be constructed into a Hash Table in memory based on the hash function on the join key. 

Then local Hash Join happens at partition level.

So Shuffle Hash Join can reduce the memory pressure on Driver and Executor comparing to Broadcast Hash Join.

However if both sides are very huge, Hash Join may not be the best candidate -- no matter it's Shuffle Hash Join or Broadcast Hash Join. In this case, Spark actually prefers another Sort Merge Join which can beat the performance in most scenarios for large-tables-join.

Hints:

No hints in Spark 2.4.
But starting from Spark 3.0, some new hints are introduced: 
  • SHUFFLE_HASH

Join Selection:

The logic is explained inside SparkStrategies.scala.

1. If Broadcast Hash Join is either disabled or the query can not meet the condition(eg. Both sides are larger than spark.sql.autoBroadcastJoinThreshold), by default Spark will choose Sort Merge Join.

So to force Spark to choose Shuffle Hash Join, the first step is to disable Sort Merge Join perference by setting spark.sql.join.preferSortMergeJoin=false. Otherwise you will not be able to see Shuffle Hash Join. Note,  this parameter is not to disable Sort Merge Join completely.  Do not assume that after setting it to false Sort Merge Join will be gone. I will explain more later.

2. Then since it is basically a Hasn Join, so below condition has to be met:

    private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

It means if you disabled Broadcast Hash Join by setting spark.sql.autoBroadcastJoinThreshold=-1 or even 0, then you will never see Shuffle Hash Join because sizeInBytes will never be negative.

So trick is to set spark.sql.autoBroadcastJoinThreshold small enough according to the number of shuffle partitions(spark.sql.shuffle.partitions) to make sure above condition is met.

To help you understand, let's use above join query for example. 

From the previous explain plan with cost, we know that the smaller table side's estimation is 16 bytes.

If spark.sql.autoBroadcastJoinThreshold>=16, then Broadcast Hash Join will be chosen.

If spark.sql.autoBroadcastJoinThreshold=8(or smaller) and spark.sql.shuffle.partitions=2, then Sort Merge Join will be chosen.

If spark.sql.autoBroadcastJoinThreshold=9(or larger) and spark.sql.shuffle.partitions=2, then Shuffle Hash Join will be chosen finally. 

This is because : 9*2>16 bytes so canBuildLocalHashMap will return true, and 9<16 bytes so Broadcast Hash Join will be disabled.

3. Another condition which must be met to trigger Shuffle Hash Join is: The Buld Table Size * 3 <= Probe Table Size.

    /**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}

What does it mean? Let me use above query for example.

The Build Table Size is 16 bytes and the Probe Table Size is 419.7MB. That is why Shufle Hash Join is possible because 16 bytes *3 < 419.7MB.

Otherwise even after both of above #1 and #2 conditions are met, you will NEVER see Shuffle Hash Join.

In all, to trigger Shuffle Hash Join, all of above 3 conditions have to met. So you know why Spark prefers Sort Merge Join other than Shuffle Hash Join now.

Test:

Here we will use some test queries to prove above Join Selection theory based on source code analysis.

a. Sort Merge Join is chosen because 16 bytes >= spark.sql.autoBroadcastJoinThreshold=2 * spark.sql.shuffle.partitions=8

set spark.sql.cbo.enabled=true;
set spark.sql.statistics.histogram.enabled=true;
use tpcds;

set spark.sql.join.preferSortMergeJoin=false;
set spark.sql.autoBroadcastJoinThreshold=8;
set spark.sql.shuffle.partitions=2;

explain cost select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_sk = s.ss_customer_sk
and c.c_customer_id = "123";

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#31, ss_sold_time_sk#45 AS c2#235], Statistics(sizeInBytes=928.0 B, rowCount=58, hints=none)
+- Join Inner, (c_customer_sk#27 = ss_customer_sk#47), Statistics(sizeInBytes=1392.0 B, rowCount=58, hints=none)
:- Project [c_customer_sk#27, c_current_addr_sk#31], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
: +- Filter ((isnotnull(c_customer_id#28) && (c_customer_id#28 = 123)) && isnotnull(c_customer_sk#27)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#27,c_customer_id#28,c_current_cdemo_sk#29,c_current_hdemo_sk#30,c_current_addr_sk#31,c_first_shipto_date_sk#32,c_first_sales_date_sk#33,c_salutation#34,c_first_name#35,c_last_name#36,c_preferred_cust_flag#37,c_birth_day#38,c_birth_month#39,c_birth_year#40,c_birth_country#41,c_login#42,c_email_address#43,c_last_review_date#44] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#45, ss_customer_sk#47], Statistics(sizeInBytes=419.7 MB, rowCount=2.75E+7, hints=none)
+- Filter isnotnull(ss_customer_sk#47), Statistics(sizeInBytes=3.9 GB, rowCount=2.75E+7, hints=none)
+- Relation[ss_sold_time_sk#45,ss_item_sk#46,ss_customer_sk#47,ss_cdemo_sk#48,ss_hdemo_sk#49,ss_addr_sk#50,ss_store_sk#51,ss_promo_sk#52,ss_ticket_number#53L,ss_quantity#54,ss_wholesale_cost#55,ss_list_price#56,ss_sales_price#57,ss_ext_discount_amt#58,ss_ext_sales_price#59,ss_ext_wholesale_cost#60,ss_ext_list_price#61,ss_ext_tax#62,ss_coupon_amt#63,ss_net_paid#64,ss_net_paid_inc_tax#65,ss_net_profit#66,ss_sold_date_sk#67] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(5) Project [c_current_addr_sk#31, ss_sold_time_sk#45 AS c2#235]
+- *(5) SortMergeJoin [c_customer_sk#27], [ss_customer_sk#47], Inner
:- *(2) Sort [c_customer_sk#27 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_customer_sk#27, 2)
: +- *(1) Project [c_customer_sk#27, c_current_addr_sk#31]
: +- *(1) Filter ((isnotnull(c_customer_id#28) && (c_customer_id#28 = 123)) && isnotnull(c_customer_sk#27))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#27,c_customer_id#28,c_current_addr_sk#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123), IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_addr_sk:int>
+- *(4) Sort [ss_customer_sk#47 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ss_customer_sk#47, 2)
+- *(3) Project [ss_sold_time_sk#45, ss_customer_sk#47]
+- *(3) Filter isnotnull(ss_customer_sk#47)
+- *(3) FileScan parquet tpcds.store_sales[ss_sold_time_sk#45,ss_customer_sk#47,ss_sold_date_sk#67] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_customer_sk)], ReadSchema: struct<ss_sold_time_sk:int,ss_customer_sk:int>

b.  Shuffle Hash Join is chosen because 16 bytes < spark.sql.autoBroadcastJoinThreshold=2 * spark.sql.shuffle.partitions=9

set spark.sql.join.preferSortMergeJoin=false;
set spark.sql.autoBroadcastJoinThreshold=9;
set spark.sql.shuffle.partitions=2;

explain cost select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_sk = s.ss_customer_sk
and c.c_customer_id = "123";

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#31, ss_sold_time_sk#45 AS c2#250], Statistics(sizeInBytes=928.0 B, rowCount=58, hints=none)
+- Join Inner, (c_customer_sk#27 = ss_customer_sk#47), Statistics(sizeInBytes=1392.0 B, rowCount=58, hints=none)
:- Project [c_customer_sk#27, c_current_addr_sk#31], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
: +- Filter ((isnotnull(c_customer_id#28) && (c_customer_id#28 = 123)) && isnotnull(c_customer_sk#27)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#27,c_customer_id#28,c_current_cdemo_sk#29,c_current_hdemo_sk#30,c_current_addr_sk#31,c_first_shipto_date_sk#32,c_first_sales_date_sk#33,c_salutation#34,c_first_name#35,c_last_name#36,c_preferred_cust_flag#37,c_birth_day#38,c_birth_month#39,c_birth_year#40,c_birth_country#41,c_login#42,c_email_address#43,c_last_review_date#44] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#45, ss_customer_sk#47], Statistics(sizeInBytes=419.7 MB, rowCount=2.75E+7, hints=none)
+- Filter isnotnull(ss_customer_sk#47), Statistics(sizeInBytes=3.9 GB, rowCount=2.75E+7, hints=none)
+- Relation[ss_sold_time_sk#45,ss_item_sk#46,ss_customer_sk#47,ss_cdemo_sk#48,ss_hdemo_sk#49,ss_addr_sk#50,ss_store_sk#51,ss_promo_sk#52,ss_ticket_number#53L,ss_quantity#54,ss_wholesale_cost#55,ss_list_price#56,ss_sales_price#57,ss_ext_discount_amt#58,ss_ext_sales_price#59,ss_ext_wholesale_cost#60,ss_ext_list_price#61,ss_ext_tax#62,ss_coupon_amt#63,ss_net_paid#64,ss_net_paid_inc_tax#65,ss_net_profit#66,ss_sold_date_sk#67] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(3) Project [c_current_addr_sk#31, ss_sold_time_sk#45 AS c2#250]
+- ShuffledHashJoin [c_customer_sk#27], [ss_customer_sk#47], Inner, BuildLeft
:- Exchange hashpartitioning(c_customer_sk#27, 2)
: +- *(1) Project [c_customer_sk#27, c_current_addr_sk#31]
: +- *(1) Filter ((isnotnull(c_customer_id#28) && (c_customer_id#28 = 123)) && isnotnull(c_customer_sk#27))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#27,c_customer_id#28,c_current_addr_sk#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123), IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_addr_sk:int>
+- Exchange hashpartitioning(ss_customer_sk#47, 2)
+- *(2) Project [ss_sold_time_sk#45, ss_customer_sk#47]
+- *(2) Filter isnotnull(ss_customer_sk#47)
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#45,ss_customer_sk#47,ss_sold_date_sk#67] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_customer_sk)], ReadSchema: struct<ss_sold_time_sk:int,ss_customer_sk:int>

c.  Sort Merge Join is chosen because Probe Table Size 16 bytes*3 > Build Table Size 16 bytes

Here we added more filters on the Build Table side("store_sales") to have less estimated Build Table Size.

set spark.sql.join.preferSortMergeJoin=false;
set spark.sql.autoBroadcastJoinThreshold=9;
set spark.sql.shuffle.partitions=2;

explain cost select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_sk = s.ss_customer_sk
and c.c_customer_id = "123"
and s.ss_item_sk=456 and s.ss_hdemo_sk=789;

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#31, ss_sold_time_sk#45 AS c2#265], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
+- Join Inner, (c_customer_sk#27 = ss_customer_sk#47), Statistics(sizeInBytes=24.0 B, rowCount=1, hints=none)
:- Project [c_customer_sk#27, c_current_addr_sk#31], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
: +- Filter ((isnotnull(c_customer_id#28) && (c_customer_id#28 = 123)) && isnotnull(c_customer_sk#27)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#27,c_customer_id#28,c_current_cdemo_sk#29,c_current_hdemo_sk#30,c_current_addr_sk#31,c_first_shipto_date_sk#32,c_first_sales_date_sk#33,c_salutation#34,c_first_name#35,c_last_name#36,c_preferred_cust_flag#37,c_birth_day#38,c_birth_month#39,c_birth_year#40,c_birth_country#41,c_login#42,c_email_address#43,c_last_review_date#44] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#45, ss_customer_sk#47], Statistics(sizeInBytes=16.0 B, rowCount=1, hints=none)
+- Filter ((((isnotnull(ss_item_sk#46) && isnotnull(ss_hdemo_sk#49)) && (ss_item_sk#46 = 456)) && (ss_hdemo_sk#49 = 789)) && isnotnull(ss_customer_sk#47)), Statistics(sizeInBytes=152.0 B, rowCount=1, hints=none)
+- Relation[ss_sold_time_sk#45,ss_item_sk#46,ss_customer_sk#47,ss_cdemo_sk#48,ss_hdemo_sk#49,ss_addr_sk#50,ss_store_sk#51,ss_promo_sk#52,ss_ticket_number#53L,ss_quantity#54,ss_wholesale_cost#55,ss_list_price#56,ss_sales_price#57,ss_ext_discount_amt#58,ss_ext_sales_price#59,ss_ext_wholesale_cost#60,ss_ext_list_price#61,ss_ext_tax#62,ss_coupon_amt#63,ss_net_paid#64,ss_net_paid_inc_tax#65,ss_net_profit#66,ss_sold_date_sk#67] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(5) Project [c_current_addr_sk#31, ss_sold_time_sk#45 AS c2#265]
+- *(5) SortMergeJoin [c_customer_sk#27], [ss_customer_sk#47], Inner
:- *(2) Sort [c_customer_sk#27 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_customer_sk#27, 2)
: +- *(1) Project [c_customer_sk#27, c_current_addr_sk#31]
: +- *(1) Filter ((isnotnull(c_customer_id#28) && (c_customer_id#28 = 123)) && isnotnull(c_customer_sk#27))
: +- *(1) FileScan parquet tpcds.customer[c_customer_sk#27,c_customer_id#28,c_current_addr_sk#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123), IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_addr_sk:int>
+- *(4) Sort [ss_customer_sk#47 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ss_customer_sk#47, 2)
+- *(3) Project [ss_sold_time_sk#45, ss_customer_sk#47]
+- *(3) Filter ((((isnotnull(ss_item_sk#46) && isnotnull(ss_hdemo_sk#49)) && (ss_item_sk#46 = 456)) && (ss_hdemo_sk#49 = 789)) && isnotnull(ss_customer_sk#47))
+- *(3) FileScan parquet tpcds.store_sales[ss_sold_time_sk#45,ss_item_sk#46,ss_customer_sk#47,ss_hdemo_sk#49,ss_sold_date_sk#67] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_item_sk), IsNotNull(ss_hdemo_sk), EqualTo(ss_item_sk,456), EqualTo(ss_hdemo_sk,789)..., ReadSchema: struct<ss_sold_time_sk:int,ss_item_sk:int,ss_customer_sk:int,ss_hdemo_sk:int>

Query Plan in Web UI:

3. Sort Merge Join (SMJ)

Use case:

Large table joins a large table. Especially when both tables are very large. 

Principle:

It firstly shuffles both tables use the same hash function on the join key to make sure rows in the same join key are in the same partition. 

Then sort the partitions from both sides.

Finally merge both sides' partitions by iterating over the elements based on the join key.

In summary, it has 3 steps -- shuffle, sort and merge.

Because Sort Merge Join does not need to build the hash table in memory, and the Sort step can spill data on disk, it will have much less memory pressure on Driver and Executor. 

Since in most large-table-join-large-table scenarios Sort Merge Join can outperfom Shuffle Hash Join, so it is the preferred join type which is controlled by parameter spark.sql.join.preferSortMergeJoin=true.

Hints:

No hints in Spark 2.4.
But starting from Spark 3.0, some new hints are introduced: 
  • SHUFFLE_MERGE
  • MERGE
  • MERGEJOIN

Join Selection:

The logic is explained inside SparkStrategies.scala:
* - Sort merge: if the matching join keys are sortable.

 The code logic is:

      // --- SortMergeJoin ------------------------------------------------------------

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

Since this is a preferred join type, the only condtion is the join key has to be sortable/order-able.

That is the reason why I digged into all Spark SQL data types in this post to figure out which data type is order-able or not. Please read that post firstly.

But do not worry, most of the common data types are order-able such as String, Integer, Long, Timestamp, Date etc.  So in most scenaiors, this condition should be met.

Of course, if this condition is not met, it could fallback to Shuffle Hash Join.

Test:

Please refer to above tests which include Sort Merge Join.

Initially I just want to test the join key on a column which is sortable/order-able, so I chose CalendarIntervalType as the join key:

set spark.sql.autoBroadcastJoinThreshold=1;
set spark.sql.join.preferSortMergeJoin=true;

explain cost
with new_customer as
(select c_current_addr_sk,c_customer_sk, interval 1 second as newcol from customer),
new_store_sales as
(select ss_sold_time_sk,ss_customer_sk, interval 1 second as newcol from store_sales)
select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from new_customer c, new_store_sales s
where c.newcol = s.newcol
and c.c_customer_id = "123";

However it reported below error:

Error in query: cannot resolve '(c.`newcol` = s.`newcol`)' due to data type mismatch: EqualTo does not support ordering on type calendarinterval; line 7 pos 6;

This is because this test is done in Spark 2.4 but as per above Doc: Calendar intervals support comparison and ordering since 3.0.0. 

So after Spark 3.0, above query will at least work but Sort Merge Join will NOT be chosen because CalendarIntervalType is NOT sortable/order-able.

Query Plan in Web UI:

 

4. Broadcast Nested Loop Join (BNLJ)

Use case:

Cartesian product between a small table and large table.

Principle:

It firstly broadcast the small table to each Executor Node through Driver. And then do the local nested loop join with the partition of big table.

Since the broadcasted small table is through Driver, so the Driver's memory should be large enough.

By default in Spark, the table/data size below 10MB(configured by spark.sql.autoBroadcastJoinThreshold) can be broadcasted.

Hints:

In Spark 2.4:
  • BROADCAST
  • BROADCASTJOIN
  • MAPJOIN

Join Selection:

The logic is explained inside SparkStrategies.scala:
* If there is no joining keys, Join implementations are chosen with the following precedence:
* - BroadcastNestedLoopJoin (BNLJ):
* BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios:
* For right outer join, the left side is broadcast. For left outer, left semi, left anti
* and the internal join type ExistenceJoin, the right side is broadcast. For inner like
* joins, either side is broadcast.
*
* Like BHJ, users still can specify the broadcast hint and session-based
* [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is broadcast.
*
* 1) Broadcast the join side with the broadcast hint, even if the size is larger than
* [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (i.e., just for
* inner-like join), the side with a smaller estimated physical size will be broadcast.
* 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side
* whose estimated physical size is smaller than the threshold. If both sides are below the
* threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used.

 The code logic is:

      // --- Without joining keys ------------------------------------------------------------

// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

Basically it is very similar to Broadcast Hash Join, and the only difference is now we do not have any join key/condition. 

Those Broadcast related hints also controls which table to be broadcasted or the smaller table will be broadcasted.

Test:

a. Broadcast Nested Loop Join is chosen and the smaller table(after Filter on "customer") will be broadcasted.

set spark.sql.crossJoin.enabled=true;

explain cost select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_id = "123"
and s.ss_item_sk=456;

Explain plan with cost:

== Optimized Logical Plan ==
Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#95], Statistics(sizeInBytes=4.6 KB, rowCount=296, hints=none)
+- Join Inner, Statistics(sizeInBytes=4.6 KB, rowCount=296, hints=none)
:- Project [c_current_addr_sk#18], Statistics(sizeInBytes=12.0 B, rowCount=1, hints=none)
: +- Filter (isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#14,c_customer_id#15,c_current_cdemo_sk#16,c_current_hdemo_sk#17,c_current_addr_sk#18,c_first_shipto_date_sk#19,c_first_sales_date_sk#20,c_salutation#21,c_first_name#22,c_last_name#23,c_preferred_cust_flag#24,c_birth_day#25,c_birth_month#26,c_birth_year#27,c_birth_country#28,c_login#29,c_email_address#30,c_last_review_date#31] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#32], Statistics(sizeInBytes=3.5 KB, rowCount=296, hints=none)
+- Filter (isnotnull(ss_item_sk#33) && (ss_item_sk#33 = 456)), Statistics(sizeInBytes=43.9 KB, rowCount=296, hints=none)
+- Relation[ss_sold_time_sk#32,ss_item_sk#33,ss_customer_sk#34,ss_cdemo_sk#35,ss_hdemo_sk#36,ss_addr_sk#37,ss_store_sk#38,ss_promo_sk#39,ss_ticket_number#40L,ss_quantity#41,ss_wholesale_cost#42,ss_list_price#43,ss_sales_price#44,ss_ext_discount_amt#45,ss_ext_sales_price#46,ss_ext_wholesale_cost#47,ss_ext_list_price#48,ss_ext_tax#49,ss_coupon_amt#50,ss_net_paid#51,ss_net_paid_inc_tax#52,ss_net_profit#53,ss_sold_date_sk#54] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(3) Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#95]
+- BroadcastNestedLoopJoin BuildLeft, Inner
:- BroadcastExchange IdentityBroadcastMode
: +- *(1) Project [c_current_addr_sk#18]
: +- *(1) Filter (isnotnull(c_customer_id#15) && (c_customer_id#15 = 123))
: +- *(1) FileScan parquet tpcds.customer[c_customer_id#15,c_current_addr_sk#18] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123)], ReadSchema: struct<c_customer_id:string,c_current_addr_sk:int>
+- *(2) Project [ss_sold_time_sk#32]
+- *(2) Filter (isnotnull(ss_item_sk#33) && (ss_item_sk#33 = 456))
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#32,ss_item_sk#33,ss_sold_date_sk#54] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_item_sk), EqualTo(ss_item_sk,456)], ReadSchema: struct<ss_sold_time_sk:int,ss_item_sk:int>

Note: here we have to set spark.sql.crossJoin.enabled=true , otherwise Spark will be "smart" enough to error out:

Error in query: Detected implicit cartesian product for INNER join between logical plans

Join condition is missing or trivial.
Either: use the CROSS JOIN syntax to allow cartesian products between these
relations, or: enable implicit cartesian products by setting the configuration
variable spark.sql.crossJoin.enabled=true;

b. Broadcast Nested Loop Join is chosen and the larger table("store_sales") will be broadcasted because we force it using hint.  

In this case, the larger table size is even larger than spark.sql.autoBroadcastJoinThreshold.

set spark.sql.crossJoin.enabled=true;

explain cost select /*+ BROADCAST(s) */ c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_id = "123"
and s.ss_item_sk=456;

Explain plan with cost: 

== Optimized Logical Plan ==
Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#102], Statistics(sizeInBytes=4.6 KB, rowCount=296, hints=none)
+- Join Inner, Statistics(sizeInBytes=4.6 KB, rowCount=296, hints=none)
:- Project [c_current_addr_sk#18], Statistics(sizeInBytes=12.0 B, rowCount=1, hints=none)
: +- Filter (isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#14,c_customer_id#15,c_current_cdemo_sk#16,c_current_hdemo_sk#17,c_current_addr_sk#18,c_first_shipto_date_sk#19,c_first_sales_date_sk#20,c_salutation#21,c_first_name#22,c_last_name#23,c_preferred_cust_flag#24,c_birth_day#25,c_birth_month#26,c_birth_year#27,c_birth_country#28,c_login#29,c_email_address#30,c_last_review_date#31] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- ResolvedHint (broadcast), Statistics(sizeInBytes=3.5 KB, rowCount=296, hints=(broadcast))
+- Project [ss_sold_time_sk#32], Statistics(sizeInBytes=3.5 KB, rowCount=296, hints=none)
+- Filter (isnotnull(ss_item_sk#33) && (ss_item_sk#33 = 456)), Statistics(sizeInBytes=43.9 KB, rowCount=296, hints=none)
+- Relation[ss_sold_time_sk#32,ss_item_sk#33,ss_customer_sk#34,ss_cdemo_sk#35,ss_hdemo_sk#36,ss_addr_sk#37,ss_store_sk#38,ss_promo_sk#39,ss_ticket_number#40L,ss_quantity#41,ss_wholesale_cost#42,ss_list_price#43,ss_sales_price#44,ss_ext_discount_amt#45,ss_ext_sales_price#46,ss_ext_wholesale_cost#47,ss_ext_list_price#48,ss_ext_tax#49,ss_coupon_amt#50,ss_net_paid#51,ss_net_paid_inc_tax#52,ss_net_profit#53,ss_sold_date_sk#54] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(3) Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#102]
+- BroadcastNestedLoopJoin BuildRight, Inner
:- *(1) Project [c_current_addr_sk#18]
: +- *(1) Filter (isnotnull(c_customer_id#15) && (c_customer_id#15 = 123))
: +- *(1) FileScan parquet tpcds.customer[c_customer_id#15,c_current_addr_sk#18] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123)], ReadSchema: struct<c_customer_id:string,c_current_addr_sk:int>
+- BroadcastExchange IdentityBroadcastMode
+- *(2) Project [ss_sold_time_sk#32]
+- *(2) Filter (isnotnull(ss_item_sk#33) && (ss_item_sk#33 = 456))
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#32,ss_item_sk#33,ss_sold_date_sk#54] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_item_sk), EqualTo(ss_item_sk,456)], ReadSchema: struct<ss_sold_time_sk:int,ss_item_sk:int>

c. Cartesian Product is chosen because the there is no Broadcast related hints and both sides are larger than spark.sql.autoBroadcastJoinThreshold.

set spark.sql.crossJoin.enabled=true;
set spark.sql.autoBroadcastJoinThreshold=1;

explain cost select c.c_current_addr_sk,s.ss_sold_time_sk as c2 from customer c, store_sales s
where c.c_customer_id = "123"
and s.ss_item_sk=456;

Explain plan with cost:  

== Optimized Logical Plan ==
Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#113], Statistics(sizeInBytes=4.6 KB, rowCount=296, hints=none)
+- Join Inner, Statistics(sizeInBytes=4.6 KB, rowCount=296, hints=none)
:- Project [c_current_addr_sk#18], Statistics(sizeInBytes=12.0 B, rowCount=1, hints=none)
: +- Filter (isnotnull(c_customer_id#15) && (c_customer_id#15 = 123)), Statistics(sizeInBytes=250.0 B, rowCount=1, hints=none)
: +- Relation[c_customer_sk#14,c_customer_id#15,c_current_cdemo_sk#16,c_current_hdemo_sk#17,c_current_addr_sk#18,c_first_shipto_date_sk#19,c_first_sales_date_sk#20,c_salutation#21,c_first_name#22,c_last_name#23,c_preferred_cust_flag#24,c_birth_day#25,c_birth_month#26,c_birth_year#27,c_birth_country#28,c_login#29,c_email_address#30,c_last_review_date#31] parquet, Statistics(sizeInBytes=119.2 MB, rowCount=5.00E+5, hints=none)
+- Project [ss_sold_time_sk#32], Statistics(sizeInBytes=3.5 KB, rowCount=296, hints=none)
+- Filter (isnotnull(ss_item_sk#33) && (ss_item_sk#33 = 456)), Statistics(sizeInBytes=43.9 KB, rowCount=296, hints=none)
+- Relation[ss_sold_time_sk#32,ss_item_sk#33,ss_customer_sk#34,ss_cdemo_sk#35,ss_hdemo_sk#36,ss_addr_sk#37,ss_store_sk#38,ss_promo_sk#39,ss_ticket_number#40L,ss_quantity#41,ss_wholesale_cost#42,ss_list_price#43,ss_sales_price#44,ss_ext_discount_amt#45,ss_ext_sales_price#46,ss_ext_wholesale_cost#47,ss_ext_list_price#48,ss_ext_tax#49,ss_coupon_amt#50,ss_net_paid#51,ss_net_paid_inc_tax#52,ss_net_profit#53,ss_sold_date_sk#54] parquet, Statistics(sizeInBytes=4.1 GB, rowCount=2.88E+7, hints=none)

== Physical Plan ==
*(3) Project [c_current_addr_sk#18, ss_sold_time_sk#32 AS c2#113]
+- CartesianProduct
:- *(1) Project [c_current_addr_sk#18]
: +- *(1) Filter (isnotnull(c_customer_id#15) && (c_customer_id#15 = 123))
: +- *(1) FileScan parquet tpcds.customer[c_customer_id#15,c_current_addr_sk#18] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs:///tpcds/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_id), EqualTo(c_customer_id,123)], ReadSchema: struct<c_customer_id:string,c_current_addr_sk:int>
+- *(2) Project [ss_sold_time_sk#32]
+- *(2) Filter (isnotnull(ss_item_sk#33) && (ss_item_sk#33 = 456))
+- *(2) FileScan parquet tpcds.store_sales[ss_sold_time_sk#32,ss_item_sk#33,ss_sold_date_sk#54] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs:///tpcds/store_sales], PartitionCount: 1824, PartitionFilters: [], PushedFilters: [IsNotNull(ss_item_sk), EqualTo(ss_item_sk,456)], ReadSchema: struct<ss_sold_time_sk:int,ss_item_sk:int>

Query Plan in Web UI:

5. Cartesian Product (CP)

Use case:

Cartesian product between small tables or between large tables(no other choices since broadcast needs too much memory).

Principle:

This is just a normal Cartesian Product. N rows CP M rows will result in N*M rows.

Say if tableA CP tableB, and tableA has N partitions while tableB has M partitions, by default, it will spawn N*M tasks.

2 parameters are related to CP's spill feature, and I will dig deeper in details in another post later.

  • spark.sql.cartesianProductExec.buffer.in.memory.threshold
  • spark.sql.cartesianProductExec.buffer.spill.threshold

Hints:

No hints in Spark 2.4.
But starting from Spark 3.0, some new hints are introduced: 
  • SHUFFLE_REPLICATE_NL

Join Selection:

The logic is explained inside SparkStrategies.scala:

* - CartesianProduct: for inner like join, CartesianProduct is the fallback option.

Basically it has to be an inner join and it is a fallback option.

Test:

 (See above test C for Broadcast Nested Loop Join (BNLJ))

Query Plan in Web UI:


 

In all, above tests and code analysis are based on Spark 2.4. Since each version of Spark will have lots of changes in query optimizer, so please always refer to the corresponding version of Spark source code.


No comments:

Post a Comment

Popular Posts