hadoop git commit: MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla
Repository: hadoop Updated Branches: refs/heads/branch-3.0 e4dcc3e60 -> 4c238b50d MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla (cherry picked from commit 4d8de7ab690ef919b392b12d856482a6a1f2bb3d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c238b50 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c238b50 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c238b50 Branch: refs/heads/branch-3.0 Commit: 4c238b50dfd83a10923bfd6eb28d7a6ad864a40f Parents: e4dcc3e Author: Jason Lowe Authored: Wed Nov 28 14:54:59 2018 -0600 Committer: Jason Lowe Committed: Wed Nov 28 16:10:02 2018 -0600 -- .../lib/output/FileOutputCommitter.java | 28 +++-- .../lib/output/TestFileOutputCommitter.java | 33 2 files changed, 51 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c238b50/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 86af2cf..0ed3259 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -389,7 +390,7 @@ public class FileOutputCommitter extends PathOutputCommitter { if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput); + mergePaths(fs, stat, finalOutput, context); } } @@ -440,10 +441,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @throws IOException on any error */ private void mergePaths(FileSystem fs, final FileStatus from, - final Path to) throws IOException { + final Path to, JobContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Merging data from " + from + " to " + to); } +reportProgress(context); FileStatus toStat; try { toStat = fs.getFileStatus(to); @@ -467,22 +469,28 @@ public class FileOutputCommitter extends PathOutputCommitter { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } else { //It is a directory so merge everything in the directories for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } else { -renameOrMerge(fs, from, to); +renameOrMerge(fs, from, to, context); } } } - private void renameOrMerge(FileSystem fs, FileStatus from, Path to) - throws IOException { + private void reportProgress(JobContext context) { +if (context instanceof Progressable) { + ((Progressable) context).progress(); +} + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to, + JobContext context) throws IOException { if (algorithmVersion == 1) { if (!fs.rename(from.getPath(), to)) { throw new IOException("Failed to rename " + from + " to " + to); @@ -491,7 +499,7 @@ public class FileOutputCommitter extends PathOutputCommitter { fs.mkdirs(to); for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } @@ -583,7 +591,7 @@ public class FileOutputCommitter extends PathOutputCommitter { committedTaskPath); } else { // directly merge everything from
hadoop git commit: MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla
Repository: hadoop Updated Branches: refs/heads/branch-3.1 d9457df98 -> e7fa638fe MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla (cherry picked from commit 4d8de7ab690ef919b392b12d856482a6a1f2bb3d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e7fa638f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e7fa638f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e7fa638f Branch: refs/heads/branch-3.1 Commit: e7fa638fe8588174d5d3db287779531de09a3e1b Parents: d9457df Author: Jason Lowe Authored: Wed Nov 28 14:54:59 2018 -0600 Committer: Jason Lowe Committed: Wed Nov 28 16:01:05 2018 -0600 -- .../lib/output/FileOutputCommitter.java | 28 +++-- .../lib/output/TestFileOutputCommitter.java | 33 2 files changed, 51 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7fa638f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index cbae575..94af338 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -400,7 +401,7 @@ public class FileOutputCommitter extends PathOutputCommitter { if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput); + mergePaths(fs, stat, finalOutput, context); } } @@ -451,10 +452,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @throws IOException on any error */ private void mergePaths(FileSystem fs, final FileStatus from, - final Path to) throws IOException { + final Path to, JobContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Merging data from " + from + " to " + to); } +reportProgress(context); FileStatus toStat; try { toStat = fs.getFileStatus(to); @@ -478,22 +480,28 @@ public class FileOutputCommitter extends PathOutputCommitter { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } else { //It is a directory so merge everything in the directories for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } else { -renameOrMerge(fs, from, to); +renameOrMerge(fs, from, to, context); } } } - private void renameOrMerge(FileSystem fs, FileStatus from, Path to) - throws IOException { + private void reportProgress(JobContext context) { +if (context instanceof Progressable) { + ((Progressable) context).progress(); +} + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to, + JobContext context) throws IOException { if (algorithmVersion == 1) { if (!fs.rename(from.getPath(), to)) { throw new IOException("Failed to rename " + from + " to " + to); @@ -502,7 +510,7 @@ public class FileOutputCommitter extends PathOutputCommitter { fs.mkdirs(to); for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } @@ -594,7 +602,7 @@ public class FileOutputCommitter extends PathOutputCommitter { committedTaskPath); } else { // directly merge everything from
hadoop git commit: MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla
Repository: hadoop Updated Branches: refs/heads/branch-3.2 df0e7766e -> 7a78bdf7b MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla (cherry picked from commit 4d8de7ab690ef919b392b12d856482a6a1f2bb3d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7a78bdf7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7a78bdf7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7a78bdf7 Branch: refs/heads/branch-3.2 Commit: 7a78bdf7bbf278678dc10de3133930723972b60d Parents: df0e776 Author: Jason Lowe Authored: Wed Nov 28 14:54:59 2018 -0600 Committer: Jason Lowe Committed: Wed Nov 28 15:54:59 2018 -0600 -- .../lib/output/FileOutputCommitter.java | 28 +++-- .../lib/output/TestFileOutputCommitter.java | 33 2 files changed, 51 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a78bdf7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index cbae575..94af338 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -400,7 +401,7 @@ public class FileOutputCommitter extends PathOutputCommitter { if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput); + mergePaths(fs, stat, finalOutput, context); } } @@ -451,10 +452,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @throws IOException on any error */ private void mergePaths(FileSystem fs, final FileStatus from, - final Path to) throws IOException { + final Path to, JobContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Merging data from " + from + " to " + to); } +reportProgress(context); FileStatus toStat; try { toStat = fs.getFileStatus(to); @@ -478,22 +480,28 @@ public class FileOutputCommitter extends PathOutputCommitter { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } else { //It is a directory so merge everything in the directories for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } else { -renameOrMerge(fs, from, to); +renameOrMerge(fs, from, to, context); } } } - private void renameOrMerge(FileSystem fs, FileStatus from, Path to) - throws IOException { + private void reportProgress(JobContext context) { +if (context instanceof Progressable) { + ((Progressable) context).progress(); +} + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to, + JobContext context) throws IOException { if (algorithmVersion == 1) { if (!fs.rename(from.getPath(), to)) { throw new IOException("Failed to rename " + from + " to " + to); @@ -502,7 +510,7 @@ public class FileOutputCommitter extends PathOutputCommitter { fs.mkdirs(to); for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } @@ -594,7 +602,7 @@ public class FileOutputCommitter extends PathOutputCommitter { committedTaskPath); } else { // directly merge everything from
hadoop git commit: MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla
Repository: hadoop Updated Branches: refs/heads/trunk 300f560fc -> 4d8de7ab6 MAPREDUCE-7164. FileOutputCommitter does not report progress while merging paths. Contributed by Kuhu Shukla Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d8de7ab Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d8de7ab Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d8de7ab Branch: refs/heads/trunk Commit: 4d8de7ab690ef919b392b12d856482a6a1f2bb3d Parents: 300f560 Author: Jason Lowe Authored: Wed Nov 28 14:54:59 2018 -0600 Committer: Jason Lowe Committed: Wed Nov 28 14:54:59 2018 -0600 -- .../lib/output/FileOutputCommitter.java | 28 +++-- .../lib/output/TestFileOutputCommitter.java | 33 2 files changed, 51 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d8de7ab/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index cbae575..94af338 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -400,7 +401,7 @@ public class FileOutputCommitter extends PathOutputCommitter { if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput); + mergePaths(fs, stat, finalOutput, context); } } @@ -451,10 +452,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @throws IOException on any error */ private void mergePaths(FileSystem fs, final FileStatus from, - final Path to) throws IOException { + final Path to, JobContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Merging data from " + from + " to " + to); } +reportProgress(context); FileStatus toStat; try { toStat = fs.getFileStatus(to); @@ -478,22 +480,28 @@ public class FileOutputCommitter extends PathOutputCommitter { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } else { //It is a directory so merge everything in the directories for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } else { -renameOrMerge(fs, from, to); +renameOrMerge(fs, from, to, context); } } } - private void renameOrMerge(FileSystem fs, FileStatus from, Path to) - throws IOException { + private void reportProgress(JobContext context) { +if (context instanceof Progressable) { + ((Progressable) context).progress(); +} + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to, + JobContext context) throws IOException { if (algorithmVersion == 1) { if (!fs.rename(from.getPath(), to)) { throw new IOException("Failed to rename " + from + " to " + to); @@ -502,7 +510,7 @@ public class FileOutputCommitter extends PathOutputCommitter { fs.mkdirs(to); for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); -mergePaths(fs, subFrom, subTo); +mergePaths(fs, subFrom, subTo, context); } } } @@ -594,7 +602,7 @@ public class FileOutputCommitter extends PathOutputCommitter { committedTaskPath); } else { // directly merge everything from taskAttemptPath to output directory - mergePaths(fs, taskAttemptDirStatus,