hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/branch-2.7 261f8abce -> a4dd069ce MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4dd069c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4dd069c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4dd069c Branch: refs/heads/branch-2.7 Commit: a4dd069ce2dfa4543a975b2f5ff23d469dfd8ead Parents: 261f8ab Author: Jason LoweAuthored: Fri Dec 8 17:42:09 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 8 17:42:09 2017 -0600 -- hadoop-mapreduce-project/CHANGES.txt| 3 + .../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 19 +- .../mapred/TestTaskAttemptListenerImpl.java | 248 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 8 files changed, 310 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dd069c/hadoop-mapreduce-project/CHANGES.txt -- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 484b927..2961023 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -12,6 +12,9 @@ Release 2.7.6 - UNRELEASED BUG FIXES +MAPREDUCE-5124. AM lacks flow control for task events. (Peter Bacsko via +jlowe) + Release 2.7.5 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dd069c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 6627604..53d758d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -329,6 +339,14 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + +AtomicReference lastStatusRef = +
[21/50] [abbrv] hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273 Branch: refs/heads/HDFS-7240 Commit: 21d36273551fa45c4130e5523b6724358cf34b1e Parents: 0faf506 Author: Jason LoweAuthored: Fri Dec 1 14:03:01 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:04:25 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 69 +++- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 315 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 302 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 9b6148c..67f8ff0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory; * This class HAS to be in this package to access package private * methods/classes. */ -@SuppressWarnings({"unchecked"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { @@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); @@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //taskStatus.getIncludeCounters(); -context.getEventHandler().handle( -new
[16/50] [abbrv] hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273 Branch: refs/heads/HDFS-9806 Commit: 21d36273551fa45c4130e5523b6724358cf34b1e Parents: 0faf506 Author: Jason LoweAuthored: Fri Dec 1 14:03:01 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:04:25 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 69 +++- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 315 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 302 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 9b6148c..67f8ff0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory; * This class HAS to be in this package to access package private * methods/classes. */ -@SuppressWarnings({"unchecked"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { @@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); @@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //taskStatus.getIncludeCounters(); -context.getEventHandler().handle( -new
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/branch-2.7 85eb32b12 -> e650fcf25 MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e650fcf2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e650fcf2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e650fcf2 Branch: refs/heads/branch-2.7 Commit: e650fcf25c7a03a9acd018be57527d4660e0709f Parents: 85eb32b Author: Jason LoweAuthored: Fri Dec 1 14:29:38 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:29:38 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 19 +- .../mapred/TestTaskAttemptListenerImpl.java | 248 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 307 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e650fcf2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 6627604..53d758d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -329,6 +339,14 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -386,9 +404,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //taskStatus.getIncludeCounters(); -context.getEventHandler().handle( -new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, -taskAttemptStatus)); +
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/branch-2.8 19c18f7cc -> 5abfce077 MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5abfce07 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5abfce07 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5abfce07 Branch: refs/heads/branch-2.8 Commit: 5abfce0776cbd9c132f7290d97efcde62dffe829 Parents: 19c18f7 Author: Jason LoweAuthored: Fri Dec 1 14:24:58 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:24:58 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 248 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 307 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5abfce07/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 6627604..53d758d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -329,6 +339,14 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -386,9 +404,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //taskStatus.getIncludeCounters(); -context.getEventHandler().handle( -new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, -taskAttemptStatus)); +
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/branch-2.9 0f5ec01ef -> 74cfca02d MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko (cherry picked from commit d0fc1cd0c8a92bc8eb37b3134104173e10b9b52f) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74cfca02 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74cfca02 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74cfca02 Branch: refs/heads/branch-2.9 Commit: 74cfca02dde44c1327e89b27ea0980f381d27b0e Parents: 0f5ec01 Author: Jason LoweAuthored: Fri Dec 1 14:15:25 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:19:34 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 248 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 307 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/74cfca02/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index fe26ee5..34bd15d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -328,6 +338,14 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -385,9 +403,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //taskStatus.getIncludeCounters(); -context.getEventHandler().handle( -new
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/branch-2 3c57defaa -> d0fc1cd0c MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d0fc1cd0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d0fc1cd0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d0fc1cd0 Branch: refs/heads/branch-2 Commit: d0fc1cd0c8a92bc8eb37b3134104173e10b9b52f Parents: 3c57def Author: Jason LoweAuthored: Fri Dec 1 14:15:25 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:15:25 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 71 +- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 248 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 307 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0fc1cd0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index fe26ee5..34bd15d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -328,6 +338,14 @@ public class TaskAttemptListenerImpl extends CompositeService TaskStatus taskStatus) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -385,9 +403,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //taskStatus.getIncludeCounters(); -context.getEventHandler().handle( -new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, -taskAttemptStatus)); +
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/branch-3.0 83b62a8ee -> 0dd7f1f4e MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko (cherry picked from commit 21d36273551fa45c4130e5523b6724358cf34b1e) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0dd7f1f4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0dd7f1f4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0dd7f1f4 Branch: refs/heads/branch-3.0 Commit: 0dd7f1f4e8d995862b362aa4f37d88ba79aabc9b Parents: 83b62a8 Author: Jason LoweAuthored: Fri Dec 1 14:03:01 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:10:02 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 69 +++- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 315 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 302 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0dd7f1f4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 9b6148c..67f8ff0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory; * This class HAS to be in this package to access package private * methods/classes. */ -@SuppressWarnings({"unchecked"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { @@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); @@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService
hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko
Repository: hadoop Updated Branches: refs/heads/trunk 0faf50624 -> 21d362735 MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273 Branch: refs/heads/trunk Commit: 21d36273551fa45c4130e5523b6724358cf34b1e Parents: 0faf506 Author: Jason LoweAuthored: Fri Dec 1 14:03:01 2017 -0600 Committer: Jason Lowe Committed: Fri Dec 1 14:04:25 2017 -0600 -- .../hadoop/mapred/TaskAttemptListenerImpl.java | 69 +++- .../job/event/TaskAttemptStatusUpdateEvent.java | 12 +- .../v2/app/job/impl/TaskAttemptImpl.java| 20 +- .../mapred/TestTaskAttemptListenerImpl.java | 315 --- .../mapreduce/v2/app/TestFetchFailure.java | 3 +- .../mapreduce/v2/app/TestMRClientService.java | 4 +- .../v2/TestSpeculativeExecutionWithMRApp.java | 13 +- 7 files changed, 302 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 9b6148c..67f8ff0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * This class is responsible for talking to the task umblical. * It also converts all the old data structures @@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory; * This class HAS to be in this package to access package private * methods/classes. */ -@SuppressWarnings({"unchecked"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { @@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap (); + + private ConcurrentMap attemptIdToStatus += new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap ()); @@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); +AtomicReference lastStatusRef = +attemptIdToStatus.get(yarnAttemptID); +if (lastStatusRef == null) { + throw new IllegalStateException("Status update was called" + + " with illegal TaskAttemptId: " + yarnAttemptID); +} + AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); @@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends CompositeService //// isn't ever changed by the Task itself. //