Tuesday, April 23, 2019

What is the difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2

Goal:

This article explains the difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2 using a sample wordcount job.

Env:

MapR 6.1
Hadoop 2.7

Solution:

The definition of mapreduce.fileoutputcommitter.algorithm.version is documented here.
The file output committer algorithm version valid algorithm version number: 1 or 2 
default to 1, which is the original algorithm In algorithm version 1, 

1. commitTask will rename directory 
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/ to 
$joboutput/_temporary/$appAttemptID/$taskID/ 

2. recoverTask will also do a rename 
$joboutput/_temporary/$appAttemptID/$taskID/ to 
$joboutput/_temporary/($appAttemptID + 1)/$taskID/ 

3. commitJob will merge every task output file in 
$joboutput/_temporary/$appAttemptID/$taskID/ to 
$joboutput/, 
then it will delete $joboutput/_temporary/ and write $joboutput/_SUCCESS 

It has a performance regression, which is discussed in MAPREDUCE-4815. 
If a job generates many files to commit then the commitJob method call at the end of the job can take minutes. 
the commit is single-threaded and waits until all tasks have completed before commencing. 

algorithm version 2 will change the behavior of commitTask, recoverTask, and commitJob. 

1. commitTask will rename all files in 
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/ to 
$joboutput/ 

2. recoverTask actually doesn't require to do anything, but for upgrade from version 1 to version 2 case, it will check if there are any files in $joboutput/_temporary/($appAttemptID - 1)/$taskID/ and rename them to 
$joboutput/ 

3. commitJob can simply delete $joboutput/_temporary and write $joboutput/_SUCCESS 
This algorithm will reduce the output commit time for large jobs by having the tasks commit directly to the final output directory as they were completing and commitJob had very little to do.

This parameter is introduced by MAPREDUCE-4815.
The reason why this algorithm.version=2 was introduced is to speed up the method "commitJob" inside FileOutputCommitter.java.

In simple, the major difference is who will do the mergePaths() work -- Reducer or ApplicationMaster(AM).
Now let's use a sample wordcount job to explain the details.

1. mapreduce.fileoutputcommitter.algorithm.version=1

AM will do mergePaths() in the end after all reducers complete.
If this MR job has many reducers,  AM will firstly wait for all reducers to finish and then use a single thread to merge the outout files.
So this algorithm has some performance concern for large jobs.

Take above wordcount job for example, the job output directory is set to /hao/wordfinal/mysubdir.
Firstly reducers(here are 3 of them) will write the output into temporary directory:
/hao/wordfinal/mysubdir/_temporary/1/_temporary/attempt_1554837836642_0136_r_000000_0/part-r-00000
/hao/wordfinal/mysubdir/_temporary/1/_temporary/attempt_1554837836642_0136_r_000001_0/part-r-00001
/hao/wordfinal/mysubdir/_temporary/1/_temporary/attempt_1554837836642_0136_r_000002_0/part-r-00002
Above theory can be confirmed from container log from reducers:
# grep -R "Saved output of task" *
container_e07_1554837836642_0136_01_000003/syslog:2019-04-23 15:44:10,158 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_1554837836642_0136_r_000000_0' to maprfs:/hao/wordfinal/mysubdir/_temporary/1/task_1554837836642_0136_r_000000
container_e07_1554837836642_0136_01_000004/syslog:2019-04-23 15:44:15,161 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_1554837836642_0136_r_000001_0' to maprfs:/hao/wordfinal/mysubdir/_temporary/1/task_1554837836642_0136_r_000001
container_e07_1554837836642_0136_01_000005/syslog:2019-04-23 15:44:20,177 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_1554837836642_0136_r_000002_0' to maprfs:/hao/wordfinal/mysubdir/_temporary/1/task_1554837836642_0136_r_000002

After all reducers complete, AM will do the mergePaths() to move those files to the final output directory:
$ hadoop fs -ls -R /hao/wordfinal
drwxr-xr-x   - mapr mapr          4 2019-04-23 15:44 /hao/wordfinal/mysubdir
-rwxr-xr-x   3 mapr mapr          0 2019-04-23 15:44 /hao/wordfinal/mysubdir/_SUCCESS
-rwxr-xr-x   3 mapr mapr          0 2019-04-23 15:44 /hao/wordfinal/mysubdir/part-r-00000
-rwxr-xr-x   3 mapr mapr          0 2019-04-23 15:44 /hao/wordfinal/mysubdir/part-r-00001
-rwxr-xr-x   3 mapr mapr         51 2019-04-23 15:44 /hao/wordfinal/mysubdir/part-r-00002
Above theory can be confirmed from DEBUG log from AM:
# grep " Merging data from" syslog
2019-04-23 15:44:20,263 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:///hao/wordfinal/mysubdir
2019-04-23 15:44:20,265 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:/hao/wordfinal/mysubdir/part-r-00002
2019-04-23 15:44:20,268 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:///hao/wordfinal/mysubdir
2019-04-23 15:44:20,270 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:/hao/wordfinal/mysubdir/part-r-00001
2019-04-23 15:44:20,272 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:///hao/wordfinal/mysubdir
2019-04-23 15:44:20,274 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:/hao/wordfinal/mysubdir/part-r-00000

2. mapreduce.fileoutputcommitter.algorithm.version=2

Each Reducer will do mergePaths() to move their output files into the final output direcotry concurrently.
So this algorithm saves a lot of time for AM when job is commiting.

Take above wordcount job for example, the job output directory is set to /hao/wordfinal/mysubdir.
Firstly reducers(here are 3 of them) will write the output into temporary directory.
This is the same as #1, nothing different.
And then reducers will do mergePaths() to move those files to the final output directory.

Above theory can be confirmed from DEBUG log from one Reducer:
# grep hao *
syslog:2019-04-23 15:57:23,101 DEBUG [main] com.mapr.fs.jni.MapRClient: Create: /hao/wordfinal/mysubdir/_temporary/1/_temporary/attempt_1554837836642_0137_r_000002_0/part-r-00002 mode = 493 replication = 3 chunkSize = default overwrite = false
syslog:2019-04-23 15:57:23,103 DEBUG [main] com.mapr.fs.Inode: >Inode GetAttr: file: /hao/wordfinal/mysubdir/_temporary/1/_temporary/attempt_1554837836642_0137_r_000002_0/part-r-00002, size: 0, chunksize: 268435456, fid: 2049.112100.1092966
syslog:2019-04-23 15:57:23,188 DEBUG [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:///hao/wordfinal/mysubdir
syslog:2019-04-23 15:57:23,192 DEBUG [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:/hao/wordfinal/mysubdir/part-r-00002
syslog:2019-04-23 15:57:23,195 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_1554837836642_0137_r_000002_0' to maprfs:///hao/wordfinal/mysubdir

So for a normal MR job, the major difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2 is :
Either AM or Reducers will do the mergePaths().

Note: the mergePaths() method is defined inside:
src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

2 comments:

Popular Posts