[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811481#comment-17811481
 ] 

ASF GitHub Bot commented on MAPREDUCE-7470:
-------------------------------------------

saxepran commented on code in PR #6469:
URL: https://github.com/apache/hadoop/pull/6469#discussion_r1468388698


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -100,12 +105,22 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
   public static final boolean
       FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
 
+  public static final String FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM = 
"mapreduce.fileoutputcommitter.jobcommit.thread.io.num";
+
+  public static final int FILEOUTPUTCOMMITTER_COMMIT_JOB_THREAD_IO_NUM_DEFAULT 
= 50;

Review Comment:
   Should we have this as a factor of the availableProcessors to the JVM. This 
would keep excessive context switching in check.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -509,6 +559,28 @@ private void renameOrMerge(FileSystem fs, FileStatus from, 
Path to,
       if (!fs.rename(from.getPath(), to)) {
         throw new IOException("Failed to rename " + from + " to " + to);
       }
+    } else if (algorithmVersion == 3) {
+      boolean rename = false;
+      if (!fs.exists(to)) {
+        commitJobLock.lock();
+        try {
+          if (!fs.exists(to)) { // double check

Review Comment:
   Was thinking if we should not have this double check. Reason being, between 
line 567 and 568, there could be some other parallel operation which can create 
`to`. So, we cannot really ensure if its going to be `to` will not be there 
while renaming. Now at the instant of renaming, two things can happen:
   1. `to` path is not there
   2. `to` path is there and is a directory
   3. `to` path is there and is a file
   
   Now, in rename, it will give false if `to` path is there and is a file (ref: 
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=HDFS%20%3A%20The%20rename%20fails%2C%20no%20exception%20is%20raised.%20Instead%20the%20method%20call%20simply%20returns%20false),
 or it will give true. Now two things can happen:
   1. `to` was not existing before rename -> after rename `to` path will be 
made.
   2. `to` was an existing dir before rename -> after rename `to/from` will get 
created (ref: 
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html#boolean_rename.28Path_src.2C_Path_d.29:~:text=If%20the%20destination%20exists%20and%20is%20a%20directory%2C%20the%20final%20destination%20of%20the%20rename%20becomes%20the%20destination%20%2B%20the%20filename%20of%20the%20source%20path.).
   
   Proposal is to:
   1. let rename happen
   2. if rename happens,  dont do anything more. Reason being, either `to` 
initially was not there or `to` was directory, then rename would have done 
similar to what is done on  line 578 to 582



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -404,6 +423,37 @@ protected void commitJobInternal(JobContext context) 
throws IOException {
         for (FileStatus stat: getAllCommittedTaskPaths(context)) {
           mergePaths(fs, stat, finalOutput, context);
         }
+      } else if (algorithmVersion == 3) {
+        ExecutorService pool = 
Executors.newFixedThreadPool(commitJobThreadIONum);

Review Comment:
   Should we have a common executorService per object. That would save overhead 
of creating new threads each time this method is called.





> multi-thread mapreduce committer
> --------------------------------
>
>                 Key: MAPREDUCE-7470
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7470
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: mrv2
>            Reporter: TianyiMa
>            Priority: Major
>              Labels: mapreduce, pull-request-available
>         Attachments: MAPREDUCE-7470.0.patch
>
>
> In cloud environment, such as aws, aliyun etc., the internet delay is 
> non-trival when we commit thounds of files.
> In our situation, the ping delay is about 0.03ms in IDC, but when move to 
> Coud, the ping delay is about 3ms, which is roughly 100x slower. We found 
> that, committing tens thounds of files will cost a few tens of minutes. The 
> more files there are, the logger it takes.
> So we propose a new committer algorithm, which is a variant of committer 
> algorithm version 1, called 3. In this new algorithm 3, in order to decrease 
> the committer time, we use a thread pool to commit job's final output.
> Our test result in Cloud production shows that, the new algorithm 3 has 
> decrease the committer time by serveral tens of times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to