Thursday, July 16, 2015

Drill Workshop -- Query parallelization for parquet files

Env:

Drill 1.1

Theory:

The parallelization of scanning Parquet files depends on the number of Parquet row groups.
The parallelization of joining Parquet files is the same as the theory mentioned in Drill Workshop -- Understanding Joins.

Goal: 

Verify the parallelization of parquet files.

Workshop:

1.  Prepare parquet files with different row group size.

The data source is based on a CSV table in hive -- passwords_csv_big.
Note: Using Drill we can create the parquet files with one row group in each file, so the number of parquet files is the same as the number of row groups.
ALTER SESSION SET `store.format` = 'parquet';
ALTER SESSION SET `store.parquet.block-size` = 536870912;    
create table dfs.drill.parq_512m as select * from dfs.root.`user/hive/warehouse/passwords_csv_big`;   

ALTER SESSION SET `store.parquet.block-size` = 134217728;    
create table dfs.drill.parq_128m as select * from dfs.root.`user/hive/warehouse/passwords_csv_big`;   

ALTER SESSION SET `store.parquet.block-size` = 67108864;    
create table dfs.drill.parq_64m as select * from dfs.root.`user/hive/warehouse/passwords_csv_big`;   

ALTER SESSION SET `store.parquet.block-size` = 33554432;    
create table dfs.drill.parq_32m as select * from dfs.root.`user/hive/warehouse/passwords_csv_big`;    
Below are the size, number of files(same as number of row groups) for above 4 tables:

Table Name Size File number
parq_512m 4.2G 18
parq_128m 4.2G 43
parq_64m 4.2G 77
parq_32m 4.2G 144

2. Parallelization for scanning parquet files.

Firstly increase the threads limits to large enough so that they will not affect the test results.
Then run below queries in Drill and check the parallelization of the scan.
ALTER SESSION SET `planner.width.max_per_node`=9999;
ALTER SESSION SET `planner.width.max_per_query`=9999;
select count(*) from (select * from dfs.drill.parq_512m) group by col0,col2;   
select count(*) from (select * from dfs.drill.parq_128m) group by col0,col2;   
select count(*) from (select * from dfs.drill.parq_64m) group by col0,col2;   
select count(*) from (select * from dfs.drill.parq_32m) group by col0,col2; 
The results are:

Table Name Size File number Scan Parallelization
parq_512m 4.2G 18 18
parq_128m 4.2G 43 43
parq_64m 4.2G 77 77
parq_32m 4.2G 144 144

3. Parallelization for joining parquet files.

As mentioned in blog Drill Workshop -- Understanding Joins:
  • The parallelism of broadcast join is decided by the fact table size and file number. (The same as fact table scan's parallelism)
  • The parallelism of distributed join is decided by "planner.slice_target" and larger table's row number.
This test is to treat above 4 tables as fact table separately, and join them with a small dimension table. And check the join parallelization under 2 situations -- broadcast join, distributed join.
One example of broadcast join is:
explain plan including all attributes for 
select count(*) from
(
select a.* from dfs.drill.parq_512m a, dfs.root.`user/hive/warehouse/passwords_csv_2` b
where a.columns[1]=b.columns[1]
);
One example of distributed join is:
ALTER SESSION set `planner.enable_broadcast_join`=false;
explain plan including all attributes for 
select count(*) from
(
select a.* from dfs.drill.parq_512m a, dfs.root.`user/hive/warehouse/passwords_csv_2` b
where a.columns[1]=b.columns[1]
);
The test results are:

Table Name Size File number Broadcast Join
Parallelization
Distributed Join
Parallelization
parq_512m 4.2G 18 18 1280
parq_128m 4.2G 43 43 1280
parq_64m 4.2G 77 77 1280
parq_32m 4.2G 144 144 1280

Note: the reason why Distributed join always got 1280 parallelization is because :
  1. The estimated row count for all the parquet tables are : "rowcount = 1.28E8" per physical plan.
  2. planner.slice_target=100000 by default.

No comments:

Post a Comment

Popular Posts