Thursday, May 21, 2015

Drill Workshop -- Understanding Joins

Env:

Drill 1.0

Theory:

Drill uses distributed and broadcast joins to join tables.
  • planner.broadcast_factor:  Controls the cost of doing a broadcast when performing a join. The lower the setting, the cheaper it is to do a broadcast join compared to other types of distribution for a join, such as a hash distribution.
    Default: 1
  • planner.broadcast_threshold:  Threshold, in terms of a number of rows, that determines whether a broadcast join is chosen for a query.
    Default: 10000000

Goal:

Understand the join types.
Know when/how to switch between broadcast join and distributed join.
Know how to control the parallelism of each join type.

Workshop:

1. Join Types

What are the join types in Drill? This is one of the most popular DBA interview questions.
People with different backgrounds will answer the same question in different dimensions.
  • SQL Developer: Inner Join, Left/Righer/Full Outer Join. [See here.]
  • Traditional DBA: Hash Join, Sort-Merge Join, Nested Loops. [See here.]
  • Admin of Hadoop/MPP products: Broadcast Join, Distributed Join. [See here.]
    [This is the focus of this article.]
All above answers are correct in different dimensions.

2. What are Broadcast Join and Distributed Join?

  • Broadcast Join
All of the selected records of one file are broadcast to the file on all other nodes before the join is performed.
Used for:  Hash Join, Sort-Merge Join, Nested Loops.
                 Not work for right/full outer join, can work for inner and left outer join.
Use Case:  Large (fact) table joins smaller (dimension) table.
  • Distributed Join
Both sides of the join are hash distributed using one of the hash-based distribution operators on the join key.
Used for: Hash Join, Sort-Merge Join.
Use Case:  2 Large tables join.

3. How to switch between Broadcast Join and Distributed Join?

By default, broadcast join is enabled by configuration "planner.enable_broadcast_join". 
SQL planner can automatically choose which join type depending on the estimated cost.
Below 2 configurations can be used to manually choose which join type.

a. planner.broadcast_factor

Per Drill 1.0 source code, this configuration's logic is in BroadcastExchangePrel.java:
$ grep broadcastFactor exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
    final double broadcastFactor = PrelUtil.getSettings(getCluster()).getBroadcastFactor();
    final double cpuCost = broadcastFactor * DrillCostBase.SVR_CPU_COST * inputRows ;
    final double networkCost = broadcastFactor * DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
It controls the estimated CPU and network cost for broadcast.
The higher it is, the higher the cost of broadcast join is. Then Drill prefers distributed join. Vice versa.
Take below join SQL for example:
explain plan including all attributes for 
select count(*) from
(
select a.* from dfs.root.`user/hive/warehouse/passwords_csv_middle` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b
where a.columns[1]=b.columns[1]
);
When planner.broadcast_factor is 1 by default, this is the SQL plan including estimated costs:
00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.07557531E7 rows, 1.946839431E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 22837
00-01      StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755753E7 rows, 1.94683943E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 22836
00-02        UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755752E7 rows, 1.94683931E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 22835
01-01          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755751E7 rows, 1.94683923E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 22834
01-02            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {2.5834828E7 rows, 1.35632847E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 22833
01-03              HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {2.0913905E7 rows, 1.15949155E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 22832
01-05                Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22827
01-07                  Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22826
01-04                Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 22831
01-06                  BroadcastExchange : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 22830
02-01                    Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22829
02-02                      Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22828
Firstly you need to understand that the cost is cumulative cost which including the cost of its children operators. For physical operator "BroadcastExchange", its CPU cost is 19991192 - 7688920 = 12302272, network cost is 18896289792 - 0 = 18896289792.

Now let's reduce planner.broadcast_factor from 1 to 0.5:
> alter session set `planner.broadcast_factor`=0.5;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | planner.broadcast_factor updated.  |
+-------+------------------------------------+
1 row selected (0.068 seconds)
Below is the SQL plan for the same SQL:
00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.07557531E7 rows, 1.885328071E8 cpu, 0.0 io, 9.448148992E9 network, 2.7064998400000002E7 memory}, id = 23815
00-01      StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755753E7 rows, 1.88532807E8 cpu, 0.0 io, 9.448148992E9 network, 2.7064998400000002E7 memory}, id = 23814
00-02        UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755752E7 rows, 1.88532795E8 cpu, 0.0 io, 9.448148992E9 network, 2.7064998400000002E7 memory}, id = 23813
01-01          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755751E7 rows, 1.88532787E8 cpu, 0.0 io, 9.448144896E9 network, 2.7064998400000002E7 memory}, id = 23812
01-02            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {2.5834828E7 rows, 1.29481711E8 cpu, 0.0 io, 9.448144896E9 network, 2.7064998400000002E7 memory}, id = 23811
01-03              HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {2.0913905E7 rows, 1.09798019E8 cpu, 0.0 io, 9.448144896E9 network, 2.7064998400000002E7 memory}, id = 23810
01-05                Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23805
01-07                  Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23804
01-04                Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 9.448144896E9 network, 0.0 memory}, id = 23809
01-06                  BroadcastExchange : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 9.448144896E9 network, 0.0 memory}, id = 23808
02-01                    Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23807
02-02                      Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23806
We can see now the CPU cost becomes 13840056 - 7688920 = 6151136, and network cost becomes 9448144896 - 0 = 9448144896.
Both of the 2 estimated costs become half of previous ones which match source code logic.
Now let's increase planner.broadcast_factor to 2, and SQL plan changes to distributed join:
00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.85940901E7 rows, 3.180145181E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 24814
00-01      StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.859409E7 rows, 3.18014518E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 24813
00-02        UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594089E7 rows, 3.18014506E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 24812
01-01          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594088E7 rows, 3.18014498E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 24811
01-02            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {4.3673165E7 rows, 2.58963422E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 24810
01-03              HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {3.8752242E7 rows, 2.3927973E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 24809
01-05                Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 24801
01-07                  HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 24800
02-01                    UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.9683692E7 rows, 4.920923E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24799
04-01                      Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.4762769E7 rows, 4.4288307E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24798
04-02                        Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24797
04-03                          Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24796
01-04                Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 24808
01-06                  Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 24807
01-08                    HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 24806
03-01                      UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {6151136.0 rows, 1.537784E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24805
05-01                        Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24804
05-02                          Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24803
05-03                            Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24802

b. planner.broadcast_threshold

Per Drill 1.0 source code, this configuration's logic is in JoinPruleBase.java:
  protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) {

    double estimatedRightRowCount = RelMetadataQuery.getRowCount(right);
    if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
        && ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON)
        && (join.getJoinType() == JoinRelType.INNER || join.getJoinType() == JoinRelType.LEFT)
        ) {
      return true;
    }
    return false;
  }
Broadcast join can be chosen only if the estimated row of smaller table is below planner.broadcast_threshold(10000000 rows by default).
And it only supports inner join and left outer join which means right/full outer joins are not supported.

In above SQL, the estimated row count of smaller table is 1537784.0, and broadcast join is chosen when planner.broadcast_factor is 1 by default.
Now let's see what will happen if we decrease planner.broadcast_threshold to 1537784.
0: jdbc:drill:zk=> alter session set `planner.broadcast_threshold`=1537784;
+-------+---------------------------------------+
|  ok   |                summary                |
+-------+---------------------------------------+
| true  | planner.broadcast_threshold updated.  |
+-------+---------------------------------------+
1 row selected (0.184 seconds)
0: jdbc:drill:zk=> explain plan including all attributes for
. . . . . . . . . . . . . . . . . . . . . . .> select count(*) from
. . . . . . . . . . . . . . . . . . . . . . .> (
. . . . . . . . . . . . . . . . . . . . . . .> select a.* from dfs.root.`user/hive/warehouse/passwords_csv_middle` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b
. . . . . . . . . . . . . . . . . . . . . . .> where a.columns[1]=b.columns[1]
. . . . . . . . . . . . . . . . . . . . . . .> );
+------+------+
| text | json |
+------+------+
| 00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.85940901E7 rows, 3.180145181E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 25637
00-01      StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.859409E7 rows, 3.18014518E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 25636
00-02        UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594089E7 rows, 3.18014506E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 25635
01-01          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594088E7 rows, 3.18014498E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 25634
01-02            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {4.3673165E7 rows, 2.58963422E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 25633
01-03              HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {3.8752242E7 rows, 2.3927973E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 25632
01-05                Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 25624
01-07                  HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 25623
02-01                    UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.9683692E7 rows, 4.920923E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25622
04-01                      Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.4762769E7 rows, 4.4288307E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25621
04-02                        Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25620
04-03                          Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25619
01-04                Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 25631
01-06                  Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 25630
01-08                    HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 25629
03-01                      UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {6151136.0 rows, 1.537784E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25628
05-01                        Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25627
05-02                          Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25626
05-03                            Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25625
If we increase planner.broadcast_threshold to 1537785, broadcast join can be chose:
0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> alter session set `planner.broadcast_threshold`=1537785;
+-------+---------------------------------------+
|  ok   |                summary                |
+-------+---------------------------------------+
| true  | planner.broadcast_threshold updated.  |
+-------+---------------------------------------+
1 row selected (0.067 seconds)
0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> explain plan including all attributes for
. . . . . . . . . . . . . . . . . . . . . . .> select count(*) from
. . . . . . . . . . . . . . . . . . . . . . .> (
. . . . . . . . . . . . . . . . . . . . . . .> select a.* from dfs.root.`user/hive/warehouse/passwords_csv_middle` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b
. . . . . . . . . . . . . . . . . . . . . . .> where a.columns[1]=b.columns[1]
. . . . . . . . . . . . . . . . . . . . . . .> );
+------+------+
| text | json |
+------+------+
| 00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.07557531E7 rows, 1.946839431E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 27438
00-01      StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755753E7 rows, 1.94683943E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 27437
00-02        UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755752E7 rows, 1.94683931E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 27436
01-01          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755751E7 rows, 1.94683923E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 27435
01-02            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {2.5834828E7 rows, 1.35632847E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 27434
01-03              HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {2.0913905E7 rows, 1.15949155E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 27433
01-05                Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27428
01-07                  Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27427
01-04                Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 27432
01-06                  BroadcastExchange : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 27431
02-01                    Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27430
02-02                      Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27429

4.  How to control the parallelism of above 2 Join types?

Tradeoff are always between performance and resource capacity. Query always executes faster with more system resources.
Assume the cluster has enough system resource, if you want to increase the performance of a query, just simply increase the parallelism of the query.
Before reading any further, it is suggested to firstly understand the basic of Control Query Parallelization.

Below tests will do a fact-dimension table join, and will test the factors which can affect the parallelism of joins.
To give all cluster resource to each SQL, I manually increased planner.width.max_per_node to 999(large enough).
Here are the table size and row number:
  • Fact table A: 734M with 20 million rows.  (1 CSV file * 734M)
  • Fact table B: 734M with 20 million rows.  (10 CSV files * 73.4M)
  • Dimension table C: 147M with 4 million rows. (2 CSV files * 73.4M)
Test 1: A joins C using broadcast join
Note that: Broadcast join consists of broadcasting the right side of the join and keeping the left side distribution as-is, which means that the parallelism of the join will depend on the left side scan.
The parallelism of this join is 3 because MFS default chunk size is 256M, and the fact table A has only 1 file with 734MB, so there are 3 splits.
The query takes 16 seconds to finish.
Test 2: B joins C using broadcast join
The same join type and the same table sizes, the only difference is fact table B has 10 files.
Now the parallelism is 10 because there are 10 files in fact table B, and each file is smaller than MFS chunk size.
The query takes 14 seconds to finish.
Test 3: A joins C using distributed join.
Comparing to test 1, distributed join's parallelism is 77. This is because planner.slice_target is 100K by default. And the estimated row count of fact table A is "rowcount = 7688921.0" from the physical SQL plan, so it spawns 77 minor fragments.
Please note: major fragment "04-xx-xx" is to scan A, and its parallelism is still 3.
The query takes 8 seconds to finish.
Test 4: B joins C using distributed join.
Comparing to test 3, distributed join also spawns 77 minor fragments because the row number of fact tables A and B are the same.
Please note: major fragment "04-xx-xx" is to scan B, and its paralelism is 10 because it has 10 files.
The query takes 7 seconds to finish.

In all:
  • The parallelism of broadcast join is decided by the fact table size and file number. (The same as fact table scan's parallelism)
  • The parallelism of distributed join is decided by "planner.slice_target" and larger table's row number.
If we dig into above 2 conclusions, then it is easy to understand that, the parallelism of the 2 join types are actually related to row width of the data.
With same size of data and same number of files:
  • The smaller the row width is, the more parallelism for distributed join. 
  • The larger the row width is, the more parallelism for broadcast join.

Reference:

Join Planning Guidelines

Sort-Based and Hash-Based Memory-Constrained Operators

No comments:

Post a Comment

Popular Posts