[
https://issues.apache.org/jira/browse/MAPREDUCE-7470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811481#comment-17811481
]
ASF GitHub Bot commented on MAPREDUCE-7470:
-------------------------------------------
saxepran commented on code in PR #6469:
URL: https://github.com/apache/hadoop/pull/6469#discussion_r1468388698
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -100,12 +105,22 @@ public class FileOutputCommitter extends
PathOutputCommitter {
public static final boolean
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
+ public static final String FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM =
"mapreduce.fileoutputcommitter.jobcommit.thread.io.num";
+
+ public static final int FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM_DEFAULT
= 50;
Review Comment:
Should we have this as a factor of the availableProcessors to the JVM. This
would keep excessive context switching in check.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -509,6 +559,28 @@ private void renameOrMerge(FileSystem fs, FileStatus from,
Path to,
if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
+ } else if (algorithmVersion == 3) {
+ boolean rename = false;
+ if (!fs.exists(to)) {
+ commitJobLock.lock();
+ try {
+ if (!fs.exists(to)) { // double check
Review Comment:
Was thinking if we should not have this double check. Reason being, between
line 567 and 568, there could be some other parallel operation which can create
`to`. So, we cannot really ensure if its going to be `to` will not be there
while renaming. Now at the instant of renaming, two things can happen:
1. `to` path is not there
2. `to` path is there and is a directory
3. `to` path is there and is a file
Now, in rename, it will give false if `to` path is there and is a file (ref:
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=HDFS%20%3A%20The%20rename%20fails%2C%20no%20exception%20is%20raised.%20Instead%20the%20method%20call%20simply%20returns%20false),
or it will give true. Now two things can happen:
1. `to` was not existing before rename -> after rename `to` path will be
made.
2. `to` was an existing dir before rename -> after rename `to/from` will get
created (ref:
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=If%20the%20destination%20exists%20and%20is%20a%20directory%2C%20the%20final%20destination%20of%20the%20rename%20becomes%20the%20destination%20%2B%20the%20filename%20of%20the%20source%20path.).
Proposal is to:
1. let rename happen
2. if rename happens, dont do anything more. Reason being, either `to`
initially was not there or `to` was directory, then rename would have done
similar to what is done on line 578 to 582
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -404,6 +423,37 @@ protected void commitJobInternal(JobContext context)
throws IOException {
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput, context);
}
+ } else if (algorithmVersion == 3) {
+ ExecutorService pool =
Executors.newFixedThreadPool(commitJobThreadIONum);
Review Comment:
Should we have a common executorService per object. That would save overhead
of creating new threads each time this method is called.
> multi-thread mapreduce committer
> --------------------------------
>
> Key: MAPREDUCE-7470
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7470
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Components: mrv2
> Reporter: TianyiMa
> Priority: Major
> Labels: mapreduce, pull-request-available
> Attachments: MAPREDUCE-7470.0.patch
>
>
> In cloud environment, such as aws, aliyun etc., the internet delay is
> non-trival when we commit thounds of files.
> In our situation, the ping delay is about 0.03ms in IDC, but when move to
> Coud, the ping delay is about 3ms, which is roughly 100x slower. We found
> that, committing tens thounds of files will cost a few tens of minutes. The
> more files there are, the logger it takes.
> So we propose a new committer algorithm, which is a variant of committer
> algorithm version 1, called 3. In this new algorithm 3, in order to decrease
> the committer time, we use a thread pool to commit job's final output.
> Our test result in Cloud production shows that, the new algorithm 3 has
> decrease the committer time by serveral tens of times.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]