Sunday, May 17, 2015

Drill Workshop -- Partition Pruning

Env:

Drill 1.0

Theory: 

Partition pruning is a performance optimization that limits the number of files and partitions that Drill reads when querying file systems and Hive tables.

Goal:

Know how to use the SQL plan to verify if SQL is optimized using partition pruning.
Know how to change the prefix of directory column.

Workshop:

1. Querying data in directories

Assume the CSV data is organized in below directory structure:
[root@h1 part1]# hadoop fs -lsr /drill/part1
drwxr-xr-x   - root root          2 2015-05-17 18:35 /drill/part1/2014
drwxr-xr-x   - root root          1 2015-05-17 18:35 /drill/part1/2014/11
-rw-r--r--   3 root root          9 2015-05-17 18:35 /drill/part1/2014/11/11.csv
drwxr-xr-x   - root root          1 2015-05-17 18:36 /drill/part1/2014/12
-rw-r--r--   3 root root          9 2015-05-17 18:36 /drill/part1/2014/12/12.csv
drwxr-xr-x   - root root          3 2015-05-17 18:35 /drill/part1/2015
drwxr-xr-x   - root root          1 2015-05-17 18:37 /drill/part1/2015/01
-rw-r--r--   3 root root          8 2015-05-17 18:37 /drill/part1/2015/01/01.csv
drwxr-xr-x   - root root          1 2015-05-17 18:37 /drill/part1/2015/02
-rw-r--r--   3 root root          8 2015-05-17 18:37 /drill/part1/2015/02/02.csv
drwxr-xr-x   - root root          1 2015-05-17 18:37 /drill/part1/2015/03
-rw-r--r--   3 root root          8 2015-05-17 18:37 /drill/part1/2015/03/03.csv
Querying the directory "part1" can show all data:
> select * from dfs.drill.`part1`;
+-----------------+-------+-------+
|     columns     | dir0  | dir1  |
+-----------------+-------+-------+
| ["1","'111'"]   | 2015  | 01    |
| ["2","'234'"]   | 2015  | 02    |
| ["3","'3rd'"]   | 2015  | 03    |
| ["11","'abc'"]  | 2014  | 11    |
| ["12","'def'"]  | 2014  | 12    |
+-----------------+-------+-------+
5 rows selected (0.116 seconds)
If we only need to get data of 2015 02~03:
> select * from dfs.drill.`part1` where dir0='2015' and (dir1 >= '02' and dir1 <= '03');
+----------------+-------+-------+
|    columns     | dir0  | dir1  |
+----------------+-------+-------+
| ["2","'234'"]  | 2015  | 02    |
| ["3","'3rd'"]  | 2015  | 03    |
+----------------+-------+-------+
2 rows selected (0.128 seconds)
Now check the physical plan of above SQL to confirm it only scans the 2 files instead of full table scan.
> explain plan for select * from dfs.drill.`part1` where dir0='2015' and (dir1 >= '02' and dir1 <= '03');
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(*=[$0])
00-02        Project(*=[$0])
00-03          Scan(groupscan=[EasyGroupScan [selectionRoot=/drill/part1, numFiles=2, columns=[`*`], files=[maprfs:/drill/part1/2015/02/02.csv, maprfs:/drill/part1/2015/03/03.csv]]])

2. Querying data in Hive partition table

Assume we create below hive partition table:
CREATE TABLE partition_table(id INT, username string)
 PARTITIONED BY(year STRING, month STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ",";

insert into table partition_table PARTITION(year='2014',month='11') select 1,'u' from passwords limit 1;
insert into table partition_table PARTITION(year='2014',month='12') select 2,'s' from passwords limit 1;
insert into table partition_table PARTITION(year='2015',month='01') select 3,'e' from passwords limit 1;
insert into table partition_table PARTITION(year='2015',month='02') select 4,'r' from passwords limit 1;
insert into table partition_table PARTITION(year='2015',month='03') select 5,'n' from passwords limit 1;
Querying the Hive table from Drill to show all data:
> select * from hive.partition_table;
+-----+-----------+-------+--------+
| id  | username  | year  | month  |
+-----+-----------+-------+--------+
| 1   | u         | 2014  | 11     |
| 2   | s         | 2014  | 12     |
| 3   | e         | 2015  | 01     |
| 4   | r         | 2015  | 02     |
| 5   | n         | 2015  | 03     |
+-----+-----------+-------+--------+
5 rows selected (0.797 seconds)
If we only need to get data of 2015 02~03:
select * from hive.partition_table where `year`='2015' and (`month` >= '02' and `month` <= '03');
+-----+-----------+-------+--------+
| id  | username  | year  | month  |
+-----+-----------+-------+--------+
| 4   | r         | 2015  | 02     |
| 5   | n         | 2015  | 03     |
+-----+-----------+-------+--------+
2 rows selected (0.374 seconds)
Now check the physical plan of above SQL and found it actually scans 3 partitions instead of 2.
(Filed DRILL-3121 for this.)
> explain plan for select * from hive.partition_table where `year`='2015' and (`month` >= '02' and `month` <= '03');
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(id=[$0], username=[$1], year=[$2], month=[$3])
00-02        SelectionVectorRemover
00-03          Filter(condition=[AND(=($2, '2015'), >=($3, '02'), <=($3, '03'))])
00-04            Scan(groupscan=[HiveScan [table=Table(dbName:default, tableName:partition_table), inputSplits=[maprfs:/user/hive/warehouse/partition_table/year=2015/month=01/000000_0:0+4, maprfs:/user/hive/warehouse/partition_table/year=2015/month=02/000000_0:0+4, maprfs:/user/hive/warehouse/partition_table/year=2015/month=03/000000_0:0+4], columns=[`*`], partitions= [Partition(values:[2015, 01]), Partition(values:[2015, 02]), Partition(values:[2015, 03])]]])

3. How to change the prefix of directory column.

Imagine what will happen if the data itself has such columns "dir0","dir1",etc?
Query will fail with "java.lang.IndexOutOfBoundsException". See DRILL-3118.
This is because, by default drill.exec.storage.file.partition.column.label is set to 'dir':
> select * from sys.options where name = 'drill.exec.storage.file.partition.column.label';
+-------------------------------------------------+---------+---------+----------+----------+-------------+-----------+------------+
|                      name                       |  kind   |  type   |  status  | num_val  | string_val  | bool_val  | float_val  |
+-------------------------------------------------+---------+---------+----------+----------+-------------+-----------+------------+
| drill.exec.storage.file.partition.column.label  | STRING  | SYSTEM  | DEFAULT  | null     | dir         | null      | null       |
+-------------------------------------------------+---------+---------+----------+----------+-------------+-----------+------------+
To workaround the issue, we can change drill.exec.storage.file.partition.column.label to a prefix which is not in the data column names.
>  alter session set `drill.exec.storage.file.partition.column.label`='directory';
+-------+----------------------------------------------------------+
|  ok   |                         summary                          |
+-------+----------------------------------------------------------+
| true  | drill.exec.storage.file.partition.column.label updated.  |
+-------+----------------------------------------------------------+
1 row selected (0.074 seconds)
> select `dir999` from dfs.root.`user/hive/warehouse/testdir999/3d49fc1fd0bc7e81-e6c5bb9affac8684_358897896_data.parquet`;
+--------------+
|    dir999    |
+--------------+
| [B@5e5af20c  |
+--------------+
1 row selected (0.098 seconds)

Reference:

Querying Directories
Partition Pruning(Drill Wiki)
Configuration Options Introduction

No comments:

Post a Comment

Popular Posts