steveloughran commented on a change in pull request #1916: URL: https://github.com/apache/hadoop/pull/1916#discussion_r412321478
########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ########## @@ -454,6 +471,27 @@ protected void commitJobInternal(JobContext context) throws IOException { */ private void mergePaths(FileSystem fs, final FileStatus from, final Path to, JobContext context) throws IOException { + final List<Future<Void>> futures = new LinkedList<>(); + final ExecutorService pool = mergeThreadNum > 1 ? + Executors.newFixedThreadPool(Math.min(mergeThreadNum, 128)) : null; + + try { + doMergePaths(fs, from, to, context, pool, futures); + if (null != pool) { + for (Future<Void> future: futures) { + FutureIOSupport.awaitFuture(future); Review comment: CompletableFuture.allOf() gives some aggregate future you can block on, so there's no need to wait in the specific order. Not sure if that makes a different performance wise. We're still evolving our understanding about how to best use futures, so any suggestions are welcome ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ########## @@ -466,31 +504,50 @@ private void mergePaths(FileSystem fs, final FileStatus from, } if (from.isFile()) { - if (toStat != null) { - if (!fs.delete(to, true)) { - throw new IOException("Failed to delete " + to); + if (null != pool) { + FileStatus finalToStat = toStat; + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (finalToStat != null) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + } + + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); + } + return null; + } + })); + } else { + if (toStat != null) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } } - } - if (!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename " + from + " to " + to); + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); Review comment: though rename/2 is broken that way. now, if we moved to FileContext, you'd get a proper rename ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ########## @@ -466,31 +504,50 @@ private void mergePaths(FileSystem fs, final FileStatus from, } if (from.isFile()) { - if (toStat != null) { - if (!fs.delete(to, true)) { - throw new IOException("Failed to delete " + to); + if (null != pool) { + FileStatus finalToStat = toStat; + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (finalToStat != null) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); + } + } + + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); + } + return null; + } + })); + } else { + if (toStat != null) { + if (!fs.delete(to, true)) { + throw new IOException("Failed to delete " + to); Review comment: delete only returns false if to isn't found you don't need this safety check ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ########## @@ -99,11 +106,18 @@ public static final boolean FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; + // The thread num use to merge paths during commitJob. If it is bigger than 1, + // a thread pool would be created to merge paths, which has better performance. + public static final String FILEOUTPUTCOMMITTER_MERGE_THREADS = Review comment: this is going to need some docs in the markdown too, I'm afraid ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ########## @@ -454,6 +471,27 @@ protected void commitJobInternal(JobContext context) throws IOException { */ private void mergePaths(FileSystem fs, final FileStatus from, final Path to, JobContext context) throws IOException { + final List<Future<Void>> futures = new LinkedList<>(); + final ExecutorService pool = mergeThreadNum > 1 ? + Executors.newFixedThreadPool(Math.min(mergeThreadNum, 128)) : null; Review comment: why 128? Also: make an explicit (private) constant. ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java ########## @@ -502,16 +559,28 @@ private void reportProgress(JobContext context) { } private void renameOrMerge(FileSystem fs, FileStatus from, Path to, - JobContext context) throws IOException { + JobContext context, ExecutorService pool, List<Future<Void>> futures) throws IOException { if (algorithmVersion == 1) { - if (!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename " + from + " to " + to); + if (null != pool) { + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!fs.rename(from.getPath(), to)) { Review comment: how about rename-throwing-an-exception is factored out to a method. it is being used enough? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org