YARN-611. Added an API to let apps specify an interval beyond which AM failures 
should be ignored towards counting max-attempts. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/14e2639f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/14e2639f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/14e2639f

Branch: refs/heads/HDFS-6584
Commit: 14e2639fd0d53f7e0b58f2f4744af44983d4e867
Parents: 98588cf
Author: Vinod Kumar Vavilapalli <vino...@apache.org>
Authored: Sat Sep 13 18:04:05 2014 -0700
Committer: Vinod Kumar Vavilapalli <vino...@apache.org>
Committed: Sat Sep 13 18:04:05 2014 -0700

----------------------------------------------------------------------
 .../mapreduce/v2/app/ControlledClock.java       |  43 -------
 .../v2/app/job/impl/TestTaskAttempt.java        |   3 +-
 .../v2/app/rm/TestRMContainerAllocator.java     |   3 +-
 .../mapreduce/v2/hs/TestHistoryFileManager.java |   3 +-
 .../hadoop-mapreduce-client-jobclient/pom.xml   |   6 +
 .../v2/TestSpeculativeExecutionWithMRApp.java   |   3 +-
 hadoop-yarn-project/CHANGES.txt                 |   6 +-
 .../records/ApplicationSubmissionContext.java   |  43 +++++++
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../pb/ApplicationSubmissionContextPBImpl.java  |  13 ++
 .../hadoop/yarn/util/ControlledClock.java       |  42 ++++++
 .../recovery/FileSystemRMStateStore.java        |   1 +
 .../recovery/MemoryRMStateStore.java            |   1 +
 .../resourcemanager/recovery/RMStateStore.java  |  10 +-
 .../recovery/ZKRMStateStore.java                |   2 +
 .../records/ApplicationAttemptStateData.java    |  15 ++-
 .../pb/ApplicationAttemptStateDataPBImpl.java   |  12 ++
 .../server/resourcemanager/rmapp/RMAppImpl.java |  44 ++++++-
 .../rmapp/attempt/RMAppAttempt.java             |   6 +
 .../rmapp/attempt/RMAppAttemptImpl.java         |  27 +++-
 .../yarn_server_resourcemanager_recovery.proto  |   1 +
 .../yarn/server/resourcemanager/MockRM.java     |  23 +++-
 .../applicationsmanager/TestAMRestart.java      | 127 +++++++++++++++++++
 .../recovery/RMStateStoreTestBase.java          |   6 +-
 24 files changed, 372 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java
deleted file mode 100644
index 198117b..0000000
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.mapreduce.v2.app;
-
-import org.apache.hadoop.yarn.util.Clock;
-
-public class ControlledClock implements Clock {
-  private long time = -1;
-  private final Clock actualClock;
-  public ControlledClock(Clock actualClock) {
-    this.actualClock = actualClock;
-  }
-  public synchronized void setTime(long time) {
-    this.time = time;
-  }
-  public synchronized void reset() {
-    time = -1;
-  }
-
-  @Override
-  public synchronized long getTime() {
-    if (time != -1) {
-      return time;
-    }
-    return actualClock.getTime();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 4aa11ed..1330344 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -34,7 +34,6 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,7 +58,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
-import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -87,6 +85,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e554281..9664b8f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -42,10 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
-import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -110,6 +108,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.After;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
index 1e07062..e2e943a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.util.UUID;
+
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -30,12 +31,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.test.CoreTestDriver;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.After;
 import org.junit.AfterClass;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
index 530822f..00eb909 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
@@ -60,6 +60,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-nodemanager</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
index 601904b..d2edd19 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
@@ -24,14 +24,12 @@ import java.util.Map;
 import java.util.Random;
 
 import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -44,6 +42,7 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 065869a..9c1abf7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -75,9 +75,13 @@ Release 2.6.0 - UNRELEASED
     YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
     containers to a preconfigured limit. (Varun Vasudev via vinodkv)
 
-    YARN-2033. YARN-2033. Merging generic-history into the Timeline Store 
+    YARN-2033. Merging generic-history into the Timeline Store 
     (Zhijie Shen via junping_du)
 
+    YARN-611. Added an API to let apps specify an interval beyond which AM
+    failures should be ignored towards counting max-attempts. (Xuan Gong via
+    vinodkv)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 1ee04f0..723a2e0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -45,6 +45,15 @@ import java.util.Set;
  *       {@link ContainerLaunchContext} of the container in which the 
  *       <code>ApplicationMaster</code> is executed.
  *     </li>
+ *     <li>maxAppAttempts. The maximum number of application attempts.
+ *     It should be no larger than the global number of max attempts in the
+ *     Yarn configuration.</li>
+ *     <li>attemptFailuresValidityInterval. The default value is -1.
+ *     when attemptFailuresValidityInterval in milliseconds is set to > 0,
+ *     the failure number will no take failures which happen out of the
+ *     validityInterval into failure count. If failure count reaches to
+ *     maxAppAttempts, the application will be failed.
+ *     </li>
  *   </ul>
  * </p>
  * 
@@ -103,6 +112,22 @@ public abstract class ApplicationSubmissionContext {
       resource, null);
   }
 
+  @Public
+  @Stable
+  public static ApplicationSubmissionContext newInstance(
+      ApplicationId applicationId, String applicationName, String queue,
+      Priority priority, ContainerLaunchContext amContainer,
+      boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+      int maxAppAttempts, Resource resource, String applicationType,
+      boolean keepContainers, long attemptFailuresValidityInterval) {
+    ApplicationSubmissionContext context =
+        newInstance(applicationId, applicationName, queue, priority,
+          amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+          resource, applicationType, keepContainers);
+    
context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
+    return context;
+  }
+
   /**
    * Get the <code>ApplicationId</code> of the submitted application.
    * @return <code>ApplicationId</code> of the submitted application
@@ -338,4 +363,22 @@ public abstract class ApplicationSubmissionContext {
   @Public
   @Stable
   public abstract void setApplicationTags(Set<String> tags);
+
+  /**
+   * Get the attemptFailuresValidityInterval in milliseconds for the 
application
+   *
+   * @return the attemptFailuresValidityInterval
+   */
+  @Public
+  @Stable
+  public abstract long getAttemptFailuresValidityInterval();
+
+  /**
+   * Set the attemptFailuresValidityInterval in milliseconds for the 
application
+   * @param attemptFailuresValidityInterval
+   */
+  @Public
+  @Stable
+  public abstract void setAttemptFailuresValidityInterval(
+      long attemptFailuresValidityInterval);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d84de4a..d8c42cc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -291,6 +291,7 @@ message ApplicationSubmissionContextProto {
   optional string applicationType = 10 [default = "YARN"];
   optional bool keep_containers_across_application_attempts = 11 [default = 
false];
   repeated string applicationTags = 12;
+  optional int64 attempt_failures_validity_interval = 13 [default = -1];
 }
 
 enum ApplicationAccessTypeProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index c2f3268..7b49a16 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -402,4 +402,17 @@ extends ApplicationSubmissionContext {
   private ResourceProto convertToProtoFormat(Resource t) {
     return ((ResourcePBImpl)t).getProto();
   }
+
+  @Override
+  public long getAttemptFailuresValidityInterval() {
+    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getAttemptFailuresValidityInterval();
+  }
+
+  @Override
+  public void setAttemptFailuresValidityInterval(
+      long attemptFailuresValidityInterval) {
+    maybeInitBuilder();
+    
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
+  }
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java
new file mode 100644
index 0000000..16bd785
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.util;
+
+
+public class ControlledClock implements Clock {
+  private long time = -1;
+  private final Clock actualClock;
+  public ControlledClock(Clock actualClock) {
+    this.actualClock = actualClock;
+  }
+  public synchronized void setTime(long time) {
+    this.time = time;
+  }
+  public synchronized void reset() {
+    time = -1;
+  }
+
+  @Override
+  public synchronized long getTime() {
+    if (time != -1) {
+      return time;
+    }
+    return actualClock.getTime();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 216659b..296f177 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -282,6 +282,7 @@ public class FileSystemRMStateStore extends RMStateStore {
                   attemptStateData.getDiagnostics(),
                   attemptStateData.getFinalApplicationStatus(),
                   attemptStateData.getAMContainerExitStatus(),
+                  attemptStateData.getFinishTime(),
                   attemptStateData.getMemorySeconds(),
                   attemptStateData.getVcoreSeconds());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index f208c74..a67da2c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -171,6 +171,7 @@ public class MemoryRMStateStore extends RMStateStore {
           attemptStateData.getDiagnostics(),
           attemptStateData.getFinalApplicationStatus(),
           attemptStateData.getAMContainerExitStatus(),
+          attemptStateData.getFinishTime(),
           attemptStateData.getMemorySeconds(),
           attemptStateData.getVcoreSeconds());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index df4f3a9..973fe54 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -261,6 +261,7 @@ public abstract class RMStateStore extends AbstractService {
     final Container masterContainer;
     final Credentials appAttemptCredentials;
     long startTime = 0;
+    long finishTime = 0;
     // fields set when attempt completes
     RMAppAttemptState state;
     String finalTrackingUrl = "N/A";
@@ -274,14 +275,15 @@ public abstract class RMStateStore extends 
AbstractService {
         Container masterContainer, Credentials appAttemptCredentials,
         long startTime, long memorySeconds, long vcoreSeconds) {
       this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
-        null, "", null, ContainerExitStatus.INVALID, memorySeconds, 
vcoreSeconds);
+        null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, 
vcoreSeconds);
     }
 
     public ApplicationAttemptState(ApplicationAttemptId attemptId,
         Container masterContainer, Credentials appAttemptCredentials,
         long startTime, RMAppAttemptState state, String finalTrackingUrl,
         String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
-        int exitStatus, long memorySeconds, long vcoreSeconds) {
+        int exitStatus, long finishTime, long memorySeconds,
+        long vcoreSeconds) {
       this.attemptId = attemptId;
       this.masterContainer = masterContainer;
       this.appAttemptCredentials = appAttemptCredentials;
@@ -291,6 +293,7 @@ public abstract class RMStateStore extends AbstractService {
       this.diagnostics = diagnostics == null ? "" : diagnostics;
       this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
       this.exitStatus = exitStatus;
+      this.finishTime = finishTime;
       this.memorySeconds = memorySeconds;
       this.vcoreSeconds = vcoreSeconds;
     }
@@ -328,6 +331,9 @@ public abstract class RMStateStore extends AbstractService {
     public long getVcoreSeconds() {
       return vcoreSeconds;
     }
+    public long getFinishTime() {
+      return this.finishTime;
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 66fe988..6cdf4ac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -604,9 +604,11 @@ public class ZKRMStateStore extends RMStateStore {
               attemptStateData.getDiagnostics(),
               attemptStateData.getFinalApplicationStatus(),
               attemptStateData.getAMContainerExitStatus(),
+              attemptStateData.getFinishTime(),
               attemptStateData.getMemorySeconds(),
               attemptStateData.getVcoreSeconds());
 
+
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index ad8cdae..63ef8f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -44,7 +44,7 @@ public abstract class ApplicationAttemptStateData {
       ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
       String finalTrackingUrl, String diagnostics,
       FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
-      long memorySeconds, long vcoreSeconds) {
+      long finishTime, long memorySeconds, long vcoreSeconds) {
     ApplicationAttemptStateData attemptStateData =
         Records.newRecord(ApplicationAttemptStateData.class);
     attemptStateData.setAttemptId(attemptId);
@@ -56,6 +56,7 @@ public abstract class ApplicationAttemptStateData {
     attemptStateData.setStartTime(startTime);
     attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
     attemptStateData.setAMContainerExitStatus(exitStatus);
+    attemptStateData.setFinishTime(finishTime);
     attemptStateData.setMemorySeconds(memorySeconds);
     attemptStateData.setVcoreSeconds(vcoreSeconds);
     return attemptStateData;
@@ -75,7 +76,7 @@ public abstract class ApplicationAttemptStateData {
       attemptState.getStartTime(), attemptState.getState(),
       attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
       attemptState.getFinalApplicationStatus(),
-      attemptState.getAMContainerExitStatus(),
+      attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
       attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
   }
 
@@ -163,7 +164,15 @@ public abstract class ApplicationAttemptStateData {
   public abstract void setAMContainerExitStatus(int exitStatus);
 
   /**
-   * Get the <em>memory seconds</em> (in MB seconds) of the application.
+   * Get the <em>finish time</em> of the application attempt.
+   * @return <em>finish time</em> of the application attempt
+   */
+  public abstract long getFinishTime();
+
+  public abstract void setFinishTime(long finishTime);
+
+  /**
+  * Get the <em>memory seconds</em> (in MB seconds) of the application.
    * @return <em>memory seconds</em> (in MB seconds) of the application
    */
   @Public

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index 4d6212d..516af2d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -318,4 +318,16 @@ public class ApplicationAttemptStateDataPBImpl extends
   private FinalApplicationStatus 
convertFromProtoFormat(FinalApplicationStatusProto s) {
     return ProtoUtils.convertFromProtoFormat(s);
   }
+
+  @Override
+  public long getFinishTime() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getFinishTime();
+  }
+
+  @Override
+  public void setFinishTime(long finishTime) {
+    maybeInitBuilder();
+    builder.setFinishTime(finishTime);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2dd2040..815b86a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -32,10 +32,10 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -81,8 +81,12 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class RMAppImpl implements RMApp, Recoverable {
 
@@ -110,6 +114,12 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final String applicationType;
   private final Set<String> applicationTags;
 
+  private final long attemptFailuresValidityInterval;
+
+  private Clock systemClock;
+
+  private boolean isNumAttemptsBeyondThreshold = false;
+
   // Mutable fields
   private long startTime;
   private long finishTime = 0;
@@ -328,6 +338,8 @@ public class RMAppImpl implements RMApp, Recoverable {
       ApplicationMasterService masterService, long submitTime,
       String applicationType, Set<String> applicationTags) {
 
+    this.systemClock = new SystemClock();
+
     this.applicationId = applicationId;
     this.name = name;
     this.rmContext = rmContext;
@@ -340,7 +352,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.submitTime = submitTime;
-    this.startTime = System.currentTimeMillis();
+    this.startTime = this.systemClock.getTime();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
 
@@ -358,6 +370,9 @@ public class RMAppImpl implements RMApp, Recoverable {
       this.maxAppAttempts = individualMaxAppAttempts;
     }
 
+    this.attemptFailuresValidityInterval =
+        submissionContext.getAttemptFailuresValidityInterval();
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -897,7 +912,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       msg = "Unmanaged application " + this.getApplicationId()
               + " failed due to " + failedEvent.getDiagnostics()
               + ". Failing the application.";
-    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
+    } else if (this.isNumAttemptsBeyondThreshold) {
       msg = "Application " + this.getApplicationId() + " failed "
               + this.maxAppAttempts + " times due to "
               + failedEvent.getDiagnostics() + ". Failing the application.";
@@ -930,7 +945,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       RMAppState stateToBeStored) {
     rememberTargetTransitions(event, transitionToDo, targetFinalState);
     this.stateBeforeFinalSaving = getState();
-    this.storedFinishTime = System.currentTimeMillis();
+    this.storedFinishTime = this.systemClock.getTime();
 
     LOG.info("Updating application " + this.applicationId
         + " with final state: " + this.targetedFinalState);
@@ -1097,7 +1112,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
       app.finishTime = app.storedFinishTime;
       if (app.finishTime == 0 ) {
-        app.finishTime = System.currentTimeMillis();
+        app.finishTime = app.systemClock.getTime();
       }
       // Recovered apps that are completed were not added to scheduler, so no
       // need to remove them from scheduler.
@@ -1118,11 +1133,16 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   private int getNumFailedAppAttempts() {
     int completedAttempts = 0;
+    long endTime = this.systemClock.getTime();
     // Do not count AM preemption, hardware failures or NM resync
     // as attempt failure.
     for (RMAppAttempt attempt : attempts.values()) {
       if (attempt.shouldCountTowardsMaxAttemptRetry()) {
-        completedAttempts++;
+        if (this.attemptFailuresValidityInterval <= 0
+            || (attempt.getFinishTime() > endTime
+                - this.attemptFailuresValidityInterval)) {
+          completedAttempts++;
+        }
       }
     }
     return completedAttempts;
@@ -1139,8 +1159,9 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+      int numberOfFailure = app.getNumFailedAppAttempts();
       if (!app.submissionContext.getUnmanagedAM()
-          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+          && numberOfFailure < app.maxAppAttempts) {
         boolean transferStateFromPreviousAttempt = false;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
@@ -1158,6 +1179,9 @@ public class RMAppImpl implements RMApp, Recoverable {
         }
         return initialState;
       } else {
+        if (numberOfFailure >= app.maxAppAttempts) {
+          app.isNumAttemptsBeyondThreshold = true;
+        }
         app.rememberTargetTransitionsAndStoreState(event,
           new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
           RMAppState.FAILED);
@@ -1244,4 +1268,10 @@ public class RMAppImpl implements RMApp, Recoverable {
         numNonAMContainerPreempted, numAMContainerPreempted,
         memorySeconds, vcoreSeconds);
   }
+
+  @Private
+  @VisibleForTesting
+  public void setSystemClock(Clock clock) {
+    this.systemClock = clock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b5ed92c..943a5e5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -213,4 +213,10 @@ public interface RMAppAttempt extends 
EventHandler<RMAppAttemptEvent> {
    * @return metrics
    */
   RMAppAttemptMetrics getRMAppAttemptMetrics();
+
+  /**
+   * the finish time of the application attempt.
+   * @return the finish time of the application attempt.
+   */
+  long getFinishTime();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1489696..863130f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -85,7 +84,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@@ -142,6 +140,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   private String originalTrackingUrl = "N/A";
   private String proxiedTrackingUrl = "N/A";
   private long startTime = 0;
+  private long finishTime = 0;
 
   // Set to null initially. Will eventually get set 
   // if an RMAppAttemptUnregistrationEvent occurs
@@ -739,6 +738,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
     this.finalStatus = attemptState.getFinalApplicationStatus();
     this.startTime = attemptState.getStartTime();
+    this.finishTime = attemptState.getFinishTime();
     this.attemptMetrics.updateAggregateAppResourceUsage(
         attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
   }
@@ -1028,11 +1028,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     AggregateAppResourceUsage resUsage =
         this.attemptMetrics.getAggregateAppResourceUsage();
     RMStateStore rmStore = rmContext.getStateStore();
+    setFinishTime(System.currentTimeMillis());
     ApplicationAttemptState attemptState =
         new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
           rmStore.getCredentialsFromAppAttempt(this), startTime,
           stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
-          resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
+          getFinishTime(), resUsage.getMemorySeconds(),
+          resUsage.getVcoreSeconds());
     LOG.info("Updating application attempt " + applicationAttemptId
         + " with final state: " + targetedFinalState + ", and exit status: "
         + exitStatus);
@@ -1747,4 +1749,23 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
     // lock
     return attemptMetrics;
   }
+
+  @Override
+  public long getFinishTime() {
+    try {
+      this.readLock.lock();
+      return this.finishTime;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  private void setFinishTime(long finishTime) {
+    try {
+      this.writeLock.lock();
+      this.finishTime = finishTime;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
index 5125a27..4d29153 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
@@ -80,6 +80,7 @@ message ApplicationAttemptStateDataProto {
     optional int32 am_container_exit_status = 9 [default = -1000];
     optional int64 memory_seconds = 10;
     optional int64 vcore_seconds = 11;
+    optional int64 finish_time = 12;
 }
 
 message EpochProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 3817637..1338a6c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -278,7 +278,16 @@ public class MockRM extends ResourceManager {
       boolean waitForAccepted, boolean keepContainers) throws Exception {
     return submitApp(masterMemory, name, user, acls, unmanaged, queue,
         maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
-        false, null);
+        false, null, 0);
+  }
+
+  public RMApp submitApp(int masterMemory, long 
attemptFailuresValidityInterval)
+      throws Exception {
+    return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+      .getShortUserName(), null, false, null,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+      false, null, attemptFailuresValidityInterval);
   }
 
   public RMApp submitApp(int masterMemory, String name, String user,
@@ -286,6 +295,17 @@ public class MockRM extends ResourceManager {
       int maxAppAttempts, Credentials ts, String appType,
       boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
       ApplicationId applicationId) throws Exception {
+    return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+      maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+      isAppIdProvided, applicationId, 0);
+  }
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+      ApplicationId applicationId, long attemptFailuresValidityInterval)
+      throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
     if (! isAppIdProvided) {
@@ -321,6 +341,7 @@ public class MockRM extends ResourceManager {
       clc.setTokens(securityTokens);
     }
     sub.setAMContainerSpec(clc);
+    sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] 
{"someGroup"});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 6c5c818..fcb4e45 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@@ -53,7 +54,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -584,4 +587,128 @@ public class TestAMRestart {
     rm1.stop();
     rm2.stop();
   }
+
+  @Test (timeout = 50000)
+  public void testRMAppAttemptFailuresValidityInterval() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    // explicitly set max-am-retry count as 2.
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // set window size to a larger number : 20s
+    // we will verify the app should be failed if
+    // two continuous attempts failed in 20s.
+    RMApp app = rm1.submitApp(200, 20000);
+    
+    MockAM am = MockRM.launchAM(app, rm1, nm1);
+    // Fail current attempt normally
+    nm1.nodeHeartbeat(am.getApplicationAttemptId(),
+      1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FAILED);
+    // launch the second attempt
+    rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(2, app.getAppAttempts().size());
+    Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
+      .mayBeLastAttempt());
+    MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
+    am_2.waitForState(RMAppAttemptState.RUNNING);
+    nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
+      1, ContainerState.COMPLETE);
+    am_2.waitForState(RMAppAttemptState.FAILED);
+    // current app should be failed.
+    rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
+
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    // set window size to 6s
+    RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 6000);;
+    app1.setSystemClock(clock);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Fail attempt1 normally
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
+      1, ContainerState.COMPLETE);
+    am1.waitForState(RMAppAttemptState.FAILED);
+
+    // launch the second attempt
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(2, app1.getAppAttempts().size());
+
+    RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+    Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
+    MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    am2.waitForState(RMAppAttemptState.RUNNING);
+
+    // wait for 6 seconds
+    clock.setTime(System.currentTimeMillis() + 6*1000);
+    // Fail attempt2 normally
+    nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
+      1, ContainerState.COMPLETE);
+    am2.waitForState(RMAppAttemptState.FAILED);
+
+    // can launch the third attempt successfully
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(3, app1.getAppAttempts().size());
+    RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
+    clock.reset();
+    MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    am3.waitForState(RMAppAttemptState.RUNNING);
+
+    // Restart rm.
+    @SuppressWarnings("resource")
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+
+    // re-register the NM
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
+    status
+      .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+    status.setContainerId(attempt3.getMasterContainer().getId());
+    status.setContainerState(ContainerState.COMPLETE);
+    status.setDiagnostics("");
+    nm1.registerNode(Collections.singletonList(status), null);
+
+    rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
+
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Lauch Attempt 4
+    MockAM am4 =
+        rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
+
+    // wait for 6 seconds
+    clock.setTime(System.currentTimeMillis() + 6*1000);
+    // Fail attempt4 normally
+    nm1
+      .nodeHeartbeat(am4.getApplicationAttemptId(), 1, 
ContainerState.COMPLETE);
+    am4.waitForState(RMAppAttemptState.FAILED);
+
+    // can launch the 5th attempt successfully
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+    MockAM am5 =
+        rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
+    clock.reset();
+    am5.waitForState(RMAppAttemptState.RUNNING);
+
+    // Fail attempt5 normally
+    nm1
+      .nodeHeartbeat(am5.getApplicationAttemptId(), 1, 
ContainerState.COMPLETE);
+    am5.waitForState(RMAppAttemptState.FAILED);
+
+    rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+    rm1.stop();
+    rm2.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index a0ddc85..85022d9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -318,7 +318,8 @@ public class RMStateStoreTestBase extends 
ClientBaseWithFixes{
           oldAttemptState.getAppAttemptCredentials(),
           oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
           "myTrackingUrl", "attemptDiagnostics",
-          FinalApplicationStatus.SUCCEEDED, 100, 0, 0);
+          FinalApplicationStatus.SUCCEEDED, 100,
+          oldAttemptState.getFinishTime(), 0, 0);
     store.updateApplicationAttemptState(newAttemptState);
 
     // test updating the state of an app/attempt whose initial state was not
@@ -341,7 +342,8 @@ public class RMStateStoreTestBase extends 
ClientBaseWithFixes{
           oldAttemptState.getAppAttemptCredentials(),
           oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
           "myTrackingUrl", "attemptDiagnostics",
-          FinalApplicationStatus.SUCCEEDED, 111, 0, 0);
+          FinalApplicationStatus.SUCCEEDED, 111,
+          oldAttemptState.getFinishTime(), 0, 0);
     store.updateApplicationAttemptState(dummyAttempt);
 
     // let things settle down

Reply via email to