Wednesday, April 24, 2019

Some reducer output files are not moved into final output directory after a MapReduce Job with customized FileOutputCommitter is migrated from MRv1 to MRv2.

Symptom:

After a MapReduce Job with customized FileOutputCommitter got migrated from MRv1 to MRv2, some reducer output files are not moved into final output directory.

Take a sample WordCount job with a customized FileOutputCommitter for example.
In this specific case, only 1st reducer's output files are moved into final output directory.
All other reducers' output files are still sitting in the temporary location:
/hao/wordfinal/part_00000/part-r-00000
/hao/wordfinal/part_00001/_temporary/1/task_1554837836642_0091_r_000001/part-r-00001
/hao/wordfinal/part_00002/_temporary/1/task_1554837836642_0091_r_000001/part-r-00002
The expected final output files should be:
/hao/wordfinal/part_00000/part-r-00000
/hao/wordfinal/part_00001/part-r-00001
/hao/wordfinal/part_00002/part-r-00002
Note: The same job works fine in MRv1 though before migrating to MRv2.

Env:

The MapReduce job is migrated from:
MapR 5.x with Hadoop 0.20.2 (MRv1)
to
MapR 6.1 with Hadoop 2.7.0 (MRv2)

Root Cause:

This WordCount job has a customized FileOutputCommitter by overwriting Output Format Class:
job.setOutputFormatClass(myOutputFormat.class);
Inside this "myOutputFormat.java",  the logic of the customized FileOutputCommitter is to set different output directory for different reducers:
  public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    throws IOException
  {
    if (this.myCommitter == null)
    {
      Path output = new Path(getOutputDir(context));
      this.myCommitter = new FileOutputCommitter(output, context);
    }
    return this.myCommitter;
  }
  
  protected static String getOutputDir(TaskAttemptContext context)
  {
    int taskID = context.getTaskAttemptID().getTaskID().getId();
    String outputBaseDir = getOutputPath(context) + "/part_" + NUMBER_FORMAT.format(taskID);
    // String outputBaseDir = getOutputPath(context) + "/part_static" ; 
    return outputBaseDir;
}

In MRv1's world:
This job works as expected.
$ hadoop fs -ls -R /hao/wordfinal
drwxr-xr-x   - mapr mapr          1 2019-04-24 14:15 /hao/wordfinal/_logs
drwxr-xr-x   - mapr mapr          1 2019-04-24 14:15 /hao/wordfinal/_logs/history
-rwxr-xr-x   3 mapr mapr      18461 2019-04-24 14:15 /hao/wordfinal/_logs/history/s1.poc.com_1556138521585_job_201904241341_0004_mapr_wordcount.jar
drwxr-xr-x   - mapr mapr          2 2019-04-24 14:15 /hao/wordfinal/part_00000
drwxr-xr-x   - mapr mapr          0 2019-04-24 14:15 /hao/wordfinal/part_00000/_temporary
-rwxr-xr-x   3 mapr mapr         26 2019-04-24 14:15 /hao/wordfinal/part_00000/part-r-00000
drwxr-xr-x   - mapr mapr          2 2019-04-24 14:15 /hao/wordfinal/part_00001
-rwxr-xr-x   3 mapr mapr          0 2019-04-24 14:15 /hao/wordfinal/part_00001/_SUCCESS
-rwxr-xr-x   3 mapr mapr         18 2019-04-24 14:15 /hao/wordfinal/part_00001/part-r-00001
drwxr-xr-x   - mapr mapr          2 2019-04-24 14:15 /hao/wordfinal/part_00002
drwxr-xr-x   - mapr mapr          0 2019-04-24 14:15 /hao/wordfinal/part_00002/_temporary
-rwxr-xr-x   3 mapr mapr          8 2019-04-24 14:15 /hao/wordfinal/part_00002/part-r-00002
This is because all of the 3 reducer outputs are actually moved by the 3 reducers themselves based on Reducer Logs:
# grep -R hao *
attempt_201904241341_0004_r_000000_0/syslog:2019-04-24 14:15:13,124 INFO output.FileOutputCommitter [main]: Saved output of task 'attempt_201904241341_0004_r_000000_0' to /hao/wordfinal/part_00000
attempt_201904241341_0004_r_000000_0/syslog:2019-04-24 14:15:18,257 INFO output.FileOutputCommitter [main]: Saved output of task 'attempt_201904241341_0004_r_000002_0' to /hao/wordfinal/part_00002
attempt_201904241341_0004_r_000001_0/syslog:2019-04-24 14:15:16,376 INFO output.FileOutputCommitter [main]: Saved output of task 'attempt_201904241341_0004_r_000001_0' to /hao/wordfinal/part_00001

In MRv2's world:
"The functionality of JobTracker in 1.x i.e resource management and job scheduling/monitoring are divided into separate daemons. - global ResourceManager (RM) and per-application ApplicationMaster (AM)."

So basically, not just reducers, AM will also call some methods(such as "mergePaths()" in this case) inside FileOutputCommitter.
As described in my previous article <What is the difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2>:
When mapreduce.fileoutputcommitter.algorithm.version=1(default value):
AM will do mergePaths() in the end after all reducers complete.
If this MR job has many reducers,  AM will firstly wait for all reducers to finish and then use a single thread to merge the output files.

Here is where the issue is.
In above customized code, the output for each reducer is defined to "part_$taskID".
So developer expects the 3 reducer can write to 3 different sub-directories:
part_00000
part_00001
part_00002
All of the 3 reducers will firstly write its output into a temporary location as below:
part_00000/_temporary/1/task_1554837836642_0091_r_000001/part-r-00000
part_00001/_temporary/1/task_1554837836642_0091_r_000001/part-r-00001
part_00002/_temporary/1/task_1554837836642_0091_r_000001/part-r-00002
After all reducers complete, AM will call "mergePath()" inside FileOutputCommitter to move above files into the final output directory.
However AM's task attempt ID is also 0, so AM thinks the output directory should be part_00000.
That is why only reducer part_00000's output file is moved by AM into final output directory.
After this job completes, below will be the problematic situation:
part_00000/part-r-00000
part_00001/_temporary/1/task_1554837836642_0091_r_000001/part-r-00001
part_00002/_temporary/1/task_1554837836642_0091_r_000001/part-r-00002

Troubleshooting:

When this kind of issue happens in MRv2, we need to simplify the original job to a minimum reproduce like WordCount job in this case.
To get to know "who is doing what", we can enable DEBUG log for Mapper, Reducer and AM:
-Dyarn.app.mapreduce.am.log.level=DEBUG -Dmapreduce.map.log.level=DEBUG -Dmapreduce.reduce.log.level=DEBUG
From AM DEBUG log,we will know which reducers' temporary output is moved by AM, and which reducers' are not.
Eg:
2019-04-18 11:36:28,234 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:/hao/wordfinal/part-r-00000
From above DEBUG log, we know only 1st reducer' output is merged by AM.

Solution:

In MRv2's world, parameter "mapreduce.fileoutputcommitter.algorithm.version" is introduced per MAPREDUCE-4815.
Setting "mapreduce.fileoutputcommitter.algorithm.version=2" in MRv2 can achive the same behavior for this specific job in MRv1, which is:
Reducers call "mergePath()" inside FileOutputCommitter instead of AM.

So for this specific customized job, in MRv2, if we set "-Dmapreduce.fileoutputcommitter.algorithm.version=2", then this job output will be moved by reducers.
As a result, below is the expected output file location:
# hadoop fs -ls -R /hao/wordfinal
-rwxr-xr-x   3 mapr mapr          0 2019-04-23 11:55 /hao/wordfinal/_SUCCESS
drwxr-xr-x   - mapr mapr          2 2019-04-23 11:55 /hao/wordfinal/part_00000
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary/1
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary/1/_temporary
drwxr-xr-x   - mapr mapr          0 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary/1/_temporary/attempt_1554837836642_0127_r_000000_0
-rwxr-xr-x   3 mapr mapr          0 2019-04-23 11:55 /hao/wordfinal/part_00000/part-r-00000
drwxr-xr-x   - mapr mapr          2 2019-04-23 11:55 /hao/wordfinal/part_00001
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary/1
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary/1/_temporary
drwxr-xr-x   - mapr mapr          0 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary/1/_temporary/attempt_1554837836642_0127_r_000001_0
-rwxr-xr-x   3 mapr mapr          0 2019-04-23 11:55 /hao/wordfinal/part_00001/part-r-00001
drwxr-xr-x   - mapr mapr          2 2019-04-23 11:55 /hao/wordfinal/part_00002
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary/1
drwxr-xr-x   - mapr mapr          1 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary/1/_temporary
drwxr-xr-x   - mapr mapr          0 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary/1/_temporary/attempt_1554837836642_0127_r_000002_0
-rwxr-xr-x   3 mapr mapr         51 2019-04-23 11:55 /hao/wordfinal/part_00002/part-r-00002

Reference:

How to customize FileOutputCommitter for MapReduce job by overwriting Output Format Class
What is the difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2


No comments:

Post a Comment