Thursday, January 15, 2015

How to enable Fetch Task instead of MapReduce Job for simple query in Hive

Goal:

Certain simple Hive queries can utilize fetch task, which can avoid the overhead of starting MapReduce job.
This article explains how to enable this feature.

Solution:

1. hive.fetch.task.conversion

This parameter controls which kind of simple query can be converted to a single fetch task.
It was added in Hive 0.10 per HIVE-2925
  • Value "none" is added in Hive 0.14 to disable this feature, per HIVE-8389.
  • Value "minimal" means SELECT *, FILTER on partition columns (WHERE and HAVING clauses), LIMIT only.
In this case, "select * from table limit 1" may finish very soon however "select col0 from table limit 1" may take longer since it needs to start MapReduce job.
Look at below explain plans:
hive> set hive.fetch.task.conversion;
hive.fetch.task.conversion=minimal
hive> explain select * from passwords limit 1;
OK
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: 1
      Processor Tree:
        TableScan
          alias: passwords
          Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: col0 (type: string), col1 (type: string), col2 (type: string), col3 (type: string), col4 (type: string), col5 (type: string), col6 (type: string)
            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            Limit
              Number of rows: 1
              Statistics: Num rows: 1 Data size: 47 Basic stats: COMPLETE Column stats: NONE
              ListSink

hive> explain select col0 from passwords limit 1;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: passwords
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: col0 (type: string)
              outputColumnNames: _col0
              Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
              Limit
                Number of rows: 1
                Statistics: Num rows: 1 Data size: 47 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 47 Basic stats: COMPLETE Column stats: NONE
                  table:
                      input format: org.apache.hadoop.mapred.TextInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 1
  • Value "more" means SELECT, FILTER, LIMIT only (including TABLESAMPLE, virtual columns)."more" can take any kind of expressions in the SELECT clause, including UDFs.(UDTFs and lateral views are not yet supported – see HIVE-5718.)
"more" is actually to allow "select col from table " to use fetch task per HIVE-887 initially.
See below explain plan:
hive> set hive.fetch.task.conversion=more;
hive> explain select col0 from passwords ;
OK
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        TableScan
          alias: passwords
          Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: col0 (type: string)
            outputColumnNames: _col0
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE
            ListSink
And it also allows "select * " with "filter" to use fetch task, eg:
select * from passwords where col0='root';
So we have to use "more" very carefully and only use it after fully testing.
Because fetch can not utilize the parallelism of MapReduce framework.

2. hive.fetch.task.aggr

Aggregation queries with no group-by clause (for example, select count(*) from src) execute final aggregations in a single reduce task. If this parameter is set to true, Hive delegates the final aggregation stage to a fetch task, possibly decreasing the query time.
It was added in Hive 0.12 per HIVE-4002.
It can help when the output of the mappers are very small, by reducing the time for shuffle and launching reduce task.

For example, compare below differences of explain plan:
hive> set hive.fetch.task.aggr;
hive.fetch.task.aggr=false
hive> explain select count(*) from passwords;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: passwords
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE
            Select Operator
              Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                Reduce Output Operator
                  sort order:
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                  value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
          Select Operator
            expressions: _col0 (type: bigint)
            outputColumnNames: _col0
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
            File Output Operator
              compressed: false
              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1

Time taken: 1.471 seconds, Fetched: 44 row(s)
hive> set hive.fetch.task.aggr=true;
hive> explain select count(*) from passwords;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: passwords
            Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE
            Select Operator
              Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        Group By Operator
          aggregations: count(_col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
          Select Operator
            expressions: _col0 (type: bigint)
            outputColumnNames: _col0
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
            ListSink

Time taken: 0.148 seconds, Fetched: 40 row(s)
Here are the performance differences:
hive> set hive.fetch.task.aggr=false;
hive> select count(*) from passwords;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Starting Job = job_201501081639_0044, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0044
Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -kill job_201501081639_0044
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-01-15 12:10:15,706 Stage-1 map = 0%,  reduce = 0%
2015-01-15 12:10:26,764 Stage-1 map = 47%,  reduce = 0%, Cumulative CPU 6.65 sec
2015-01-15 12:10:27,776 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 8.31 sec
2015-01-15 12:10:32,808 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 9.27 sec
MapReduce Total cumulative CPU time: 9 seconds 270 msec
Ended Job = job_201501081639_0044
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 9.27 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 9 seconds 270 msec
OK
9963904
Time taken: 19.505 seconds, Fetched: 1 row(s)
hive> set hive.fetch.task.aggr=true;
hive> select count(*) from passwords;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501081639_0045, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0045
Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -kill job_201501081639_0045
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2015-01-15 12:10:47,231 Stage-1 map = 0%,  reduce = 0%
2015-01-15 12:10:58,301 Stage-1 map = 47%,  reduce = 0%
2015-01-15 12:10:59,308 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 7.26 sec
MapReduce Total cumulative CPU time: 7 seconds 260 msec
Ended Job = job_201501081639_0045
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 7.26 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 260 msec
OK
9963904
Time taken: 13.817 seconds, Fetched: 1 row(s)
Note, if the query has "group-by", it can not use this feature. For example:
select count(*) from passwords group by col0;

3.  hive.fetch.task.conversion.threshold

This parameter controls input threshold (in bytes) for applying hive.fetch.task.conversion.
It was added in Hive 0.13 per HIVE-3990, default is "-1" to disable the feature;
The default value was changed in Hive 0.14 to 1GB(1073741824) per HIVE-7397.

Take above example, the table size of "passwords" is 465MB:
[root@n3a passwords]# pwd
/mapr/my.cluster.com/user/hive/warehouse/passwords
[root@n3a passwords]# du -sh  .
465M .
If hive.fetch.task.conversion.threshold is less than the table size, it will use MapReduce Job:
hive> set hive.fetch.task.conversion.threshold=100000000;
hive> select * from passwords limit 1;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201501081639_0046, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0046
Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -kill job_201501081639_0046
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2015-01-15 12:19:06,474 Stage-1 map = 0%,  reduce = 0%
2015-01-15 12:19:11,496 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 0.85 sec
MapReduce Total cumulative CPU time: 850 msec
Ended Job = job_201501081639_0046
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 0.85 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 850 msec
OK
root x 0 0 root /root /bin/bash
Time taken: 6.698 seconds, Fetched: 1 row(s)
Else, it will only use fetch task:
hive> set hive.fetch.task.conversion.threshold=600000000;
hive> select * from passwords limit 1;
OK
root x 0 0 root /root /bin/bash
Time taken: 0.325 seconds, Fetched: 1 row(s)
Note, this parameter calculates or estimates based on the table size, not the result set size.

1 comment:

Popular Posts