Friday, April 17, 2015

Understand spill file index in MapReduce V2

Env

Hadoop 2.x
MapReduce V2

Goal

This article explains when MapReduce V2 starts to generate spill file index files.
We will explorer the Hadoop 2.x source code to get the theory and do some tests to prove the theory.

Theory

1. Concept of Spill File

Each mapper has a memory buffer for sorting, and by default its size is 100MB(mapreduce.task.io.sort.mb). When the utilization of the memory buffer reaches a threshold(mapreduce.map.sort.spill.percent), SpillThread will start to spill the contents to disk.
Spill file names are like: spill1.out, spill2.out, etc.

2. Concept of Spill file index

Each mapper also has a memory buffer for spill file index, and by default its size is 1KB(mapreduce.task.index.cache.limit.bytes). When this memory buffer is full, spill file index will be written to disk for its corresponding spill file.
Spill file index names are like : spill110.out.index, spill111.out.index, etc.

3. Data structure of Spill file index

Per Hadoop source code MapTask.java, spill file index is actually an array of object "SpillRecord":
final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();
Per SpillRecord.java, each "SpillRecord" will actually allocate below memory when constructing:
  public SpillRecord(int numPartitions) {
    buf = ByteBuffer.allocate(
        numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
    entries = buf.asLongBuffer();
  }

3.1 What is the value of "numPartitions"?

Per MapTask.java, "numPartitions" is actually the number of reducers.
  partitions = jobContext.getNumReduceTasks();
  final SpillRecord spillRec = new SpillRecord(partitions);

3.2 What is the value of "MAP_OUTPUT_INDEX_RECORD_LENGTH"?

Per MapTask.java, "MAP_OUTPUT_INDEX_RECORD_LENGTH" is a static value -- 24(bytes).
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;

In all, the size of memory for one spill file index is:
Number_Of_Reducers * 24 (Bytes)
Since the spill file index memory buffer by default is 1KB=1024*1024, the spill file index should appear when the number of spill files reaches:
1024*1024/Number_Of_Reducers/24

Tests

The goal of the tests is to prove above theory and confirm when spill file index starts to be written to disk. Basically the tests will record: after how many spill files are generated, the first spill file index shows up. 
To make the tests result comparable, we need to limit this MapReduce job to use only one mapper, and then control the number of the reducers.

1. Limit to one mapper

By default, the number of mappers =  number of HDFS blocks(MFS chunks in MapR).
In order to provide one mapper enough data to sort so that the spill file index can be reproduced, we used Hive to compress a 48G text table to a 28MB ORC table.
So that only one mapper will be triggered for us to watch the behaviors easily.

2. Control the number of reducers.

Pig can set default_parallel to control the number of reducers easily.

In all, one sample pig job for tests is:
pig -useHCatalog

set mapreduce.task.io.sort.mb 10;
set mapreduce.map.memory.mb 2048;
set mapreduce.map.java.opts= -Xmx2g;
set default_parallel 400;
set mapreduce.task.index.cache.limit.bytes 1048576;
A = LOAD 'passwords_orc' USING org.apache.hcatalog.pig.HCatLoader();
B = GROUP A BY col0;
C = limit B 3;
DUMP C;

Test Result

(If above format is not good, you can also click this google doc link to check the test results.)
The fact matches the theory perfectly.

Key Take Aways

  • Spill file index will be written to disk after below many spill files are generated:
mapreduce.task.index.cache.limit.bytes/Number_Of_Reducers/24
  • Need to increase mapreduce.task.index.cache.limit.bytes to improve MapReduce job's performance once spill file index shows up on disk.
  • Reducing the number of reducers can save memory buffer to store spill file index.
Reference:
Anatomy of a MapReduce Job

No comments:

Post a Comment

Popular Posts