hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-08 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 261f8abce -> a4dd069ce


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4dd069c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4dd069c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4dd069c

Branch: refs/heads/branch-2.7
Commit: a4dd069ce2dfa4543a975b2f5ff23d469dfd8ead
Parents: 261f8ab
Author: Jason Lowe 
Authored: Fri Dec 8 17:42:09 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 8 17:42:09 2017 -0600

--
 hadoop-mapreduce-project/CHANGES.txt|   3 +
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  71 +-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  19 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 248 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 8 files changed, 310 insertions(+), 63 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dd069c/hadoop-mapreduce-project/CHANGES.txt
--
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 484b927..2961023 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -12,6 +12,9 @@ Release 2.7.6 - UNRELEASED
 
   BUG FIXES
 
+MAPREDUCE-5124. AM lacks flow control for task events. (Peter Bacsko via
+jlowe)
+
 Release 2.7.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4dd069c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 6627604..53d758d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -329,6 +339,14 @@ public class TaskAttemptListenerImpl extends 
CompositeService
   TaskStatus taskStatus) throws IOException, InterruptedException {
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
+
+AtomicReference lastStatusRef =
+

[21/50] [abbrv] hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-06 Thread aengineer
MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273

Branch: refs/heads/HDFS-7240
Commit: 21d36273551fa45c4130e5523b6724358cf34b1e
Parents: 0faf506
Author: Jason Lowe 
Authored: Fri Dec 1 14:03:01 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:04:25 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  69 +++-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 315 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 302 insertions(+), 134 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 9b6148c..67f8ff0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory;
  * This class HAS to be in this package to access package private 
  * methods/classes.
  */
-@SuppressWarnings({"unchecked"})
 public class TaskAttemptListenerImpl extends CompositeService 
 implements TaskUmbilicalProtocol, TaskAttemptListener {
 
@@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
 
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 AMFeedback feedback = new AMFeedback();
 feedback.setTaskFound(true);
 
@@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //taskStatus.getIncludeCounters();
 
-context.getEventHandler().handle(
-new 

[16/50] [abbrv] hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread virajith
MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273

Branch: refs/heads/HDFS-9806
Commit: 21d36273551fa45c4130e5523b6724358cf34b1e
Parents: 0faf506
Author: Jason Lowe 
Authored: Fri Dec 1 14:03:01 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:04:25 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  69 +++-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 315 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 302 insertions(+), 134 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 9b6148c..67f8ff0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory;
  * This class HAS to be in this package to access package private 
  * methods/classes.
  */
-@SuppressWarnings({"unchecked"})
 public class TaskAttemptListenerImpl extends CompositeService 
 implements TaskUmbilicalProtocol, TaskAttemptListener {
 
@@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
 
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 AMFeedback feedback = new AMFeedback();
 feedback.setTaskFound(true);
 
@@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //taskStatus.getIncludeCounters();
 
-context.getEventHandler().handle(
-new 

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 85eb32b12 -> e650fcf25


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e650fcf2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e650fcf2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e650fcf2

Branch: refs/heads/branch-2.7
Commit: e650fcf25c7a03a9acd018be57527d4660e0709f
Parents: 85eb32b
Author: Jason Lowe 
Authored: Fri Dec 1 14:29:38 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:29:38 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  71 +-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  19 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 248 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 307 insertions(+), 63 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e650fcf2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 6627604..53d758d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -329,6 +339,14 @@ public class TaskAttemptListenerImpl extends 
CompositeService
   TaskStatus taskStatus) throws IOException, InterruptedException {
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
+
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 taskHeartbeatHandler.progressing(yarnAttemptID);
 TaskAttemptStatus taskAttemptStatus =
 new TaskAttemptStatus();
@@ -386,9 +404,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //taskStatus.getIncludeCounters();
 
-context.getEventHandler().handle(
-new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
-taskAttemptStatus));
+

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 19c18f7cc -> 5abfce077


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5abfce07
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5abfce07
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5abfce07

Branch: refs/heads/branch-2.8
Commit: 5abfce0776cbd9c132f7290d97efcde62dffe829
Parents: 19c18f7
Author: Jason Lowe 
Authored: Fri Dec 1 14:24:58 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:24:58 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  71 +-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 248 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 307 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5abfce07/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 6627604..53d758d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -329,6 +339,14 @@ public class TaskAttemptListenerImpl extends 
CompositeService
   TaskStatus taskStatus) throws IOException, InterruptedException {
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
+
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 taskHeartbeatHandler.progressing(yarnAttemptID);
 TaskAttemptStatus taskAttemptStatus =
 new TaskAttemptStatus();
@@ -386,9 +404,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //taskStatus.getIncludeCounters();
 
-context.getEventHandler().handle(
-new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
-taskAttemptStatus));
+

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.9 0f5ec01ef -> 74cfca02d


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko

(cherry picked from commit d0fc1cd0c8a92bc8eb37b3134104173e10b9b52f)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74cfca02
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74cfca02
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74cfca02

Branch: refs/heads/branch-2.9
Commit: 74cfca02dde44c1327e89b27ea0980f381d27b0e
Parents: 0f5ec01
Author: Jason Lowe 
Authored: Fri Dec 1 14:15:25 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:19:34 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  71 +-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 248 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 307 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/74cfca02/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index fe26ee5..34bd15d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -328,6 +338,14 @@ public class TaskAttemptListenerImpl extends 
CompositeService
   TaskStatus taskStatus) throws IOException, InterruptedException {
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
+
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 taskHeartbeatHandler.progressing(yarnAttemptID);
 TaskAttemptStatus taskAttemptStatus =
 new TaskAttemptStatus();
@@ -385,9 +403,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //taskStatus.getIncludeCounters();
 
-context.getEventHandler().handle(
-new 

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 3c57defaa -> d0fc1cd0c


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d0fc1cd0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d0fc1cd0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d0fc1cd0

Branch: refs/heads/branch-2
Commit: d0fc1cd0c8a92bc8eb37b3134104173e10b9b52f
Parents: 3c57def
Author: Jason Lowe 
Authored: Fri Dec 1 14:15:25 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:15:25 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  71 +-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 248 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 307 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d0fc1cd0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index fe26ee5..34bd15d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -55,6 +58,8 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -80,6 +85,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -328,6 +338,14 @@ public class TaskAttemptListenerImpl extends 
CompositeService
   TaskStatus taskStatus) throws IOException, InterruptedException {
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
+
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 taskHeartbeatHandler.progressing(yarnAttemptID);
 TaskAttemptStatus taskAttemptStatus =
 new TaskAttemptStatus();
@@ -385,9 +403,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //taskStatus.getIncludeCounters();
 
-context.getEventHandler().handle(
-new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
-taskAttemptStatus));
+

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 83b62a8ee -> 0dd7f1f4e


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko

(cherry picked from commit 21d36273551fa45c4130e5523b6724358cf34b1e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0dd7f1f4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0dd7f1f4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0dd7f1f4

Branch: refs/heads/branch-3.0
Commit: 0dd7f1f4e8d995862b362aa4f37d88ba79aabc9b
Parents: 83b62a8
Author: Jason Lowe 
Authored: Fri Dec 1 14:03:01 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:10:02 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  69 +++-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 315 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 302 insertions(+), 134 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0dd7f1f4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 9b6148c..67f8ff0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory;
  * This class HAS to be in this package to access package private 
  * methods/classes.
  */
-@SuppressWarnings({"unchecked"})
 public class TaskAttemptListenerImpl extends CompositeService 
 implements TaskUmbilicalProtocol, TaskAttemptListener {
 
@@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
 
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 AMFeedback feedback = new AMFeedback();
 feedback.setTaskFound(true);
 
@@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 

hadoop git commit: MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter Bacsko

2017-12-01 Thread jlowe
Repository: hadoop
Updated Branches:
  refs/heads/trunk 0faf50624 -> 21d362735


MAPREDUCE-5124. AM lacks flow control for task events. Contributed by Peter 
Bacsko


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21d36273
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21d36273
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21d36273

Branch: refs/heads/trunk
Commit: 21d36273551fa45c4130e5523b6724358cf34b1e
Parents: 0faf506
Author: Jason Lowe 
Authored: Fri Dec 1 14:03:01 2017 -0600
Committer: Jason Lowe 
Committed: Fri Dec 1 14:04:25 2017 -0600

--
 .../hadoop/mapred/TaskAttemptListenerImpl.java  |  69 +++-
 .../job/event/TaskAttemptStatusUpdateEvent.java |  12 +-
 .../v2/app/job/impl/TaskAttemptImpl.java|  20 +-
 .../mapred/TestTaskAttemptListenerImpl.java | 315 ---
 .../mapreduce/v2/app/TestFetchFailure.java  |   3 +-
 .../mapreduce/v2/app/TestMRClientService.java   |   4 +-
 .../v2/TestSpeculativeExecutionWithMRApp.java   |  13 +-
 7 files changed, 302 insertions(+), 134 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21d36273/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 9b6148c..67f8ff0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -36,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -58,6 +61,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for talking to the task umblical.
  * It also converts all the old data structures
@@ -66,7 +71,6 @@ import org.slf4j.LoggerFactory;
  * This class HAS to be in this package to access package private 
  * methods/classes.
  */
-@SuppressWarnings({"unchecked"})
 public class TaskAttemptListenerImpl extends CompositeService 
 implements TaskUmbilicalProtocol, TaskAttemptListener {
 
@@ -84,6 +88,11 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap
 jvmIDToActiveAttemptMap
   = new ConcurrentHashMap();
+
+  private ConcurrentMap attemptIdToStatus
+= new ConcurrentHashMap<>();
+
   private Set launchedJVMs = Collections
   .newSetFromMap(new ConcurrentHashMap());
 
@@ -359,6 +368,13 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
 TypeConverter.toYarn(taskAttemptID);
 
+AtomicReference lastStatusRef =
+attemptIdToStatus.get(yarnAttemptID);
+if (lastStatusRef == null) {
+  throw new IllegalStateException("Status update was called"
+  + " with illegal TaskAttemptId: " + yarnAttemptID);
+}
+
 AMFeedback feedback = new AMFeedback();
 feedback.setTaskFound(true);
 
@@ -437,9 +453,8 @@ public class TaskAttemptListenerImpl extends 
CompositeService
 //// isn't ever changed by the Task itself.
 //