YARN-611. Added an API to let apps specify an interval beyond which AM failures should be ignored towards counting max-attempts. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/14e2639f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/14e2639f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/14e2639f Branch: refs/heads/HDFS-6584 Commit: 14e2639fd0d53f7e0b58f2f4744af44983d4e867 Parents: 98588cf Author: Vinod Kumar Vavilapalli <vino...@apache.org> Authored: Sat Sep 13 18:04:05 2014 -0700 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Sat Sep 13 18:04:05 2014 -0700 ---------------------------------------------------------------------- .../mapreduce/v2/app/ControlledClock.java | 43 ------- .../v2/app/job/impl/TestTaskAttempt.java | 3 +- .../v2/app/rm/TestRMContainerAllocator.java | 3 +- .../mapreduce/v2/hs/TestHistoryFileManager.java | 3 +- .../hadoop-mapreduce-client-jobclient/pom.xml | 6 + .../v2/TestSpeculativeExecutionWithMRApp.java | 3 +- hadoop-yarn-project/CHANGES.txt | 6 +- .../records/ApplicationSubmissionContext.java | 43 +++++++ .../src/main/proto/yarn_protos.proto | 1 + .../pb/ApplicationSubmissionContextPBImpl.java | 13 ++ .../hadoop/yarn/util/ControlledClock.java | 42 ++++++ .../recovery/FileSystemRMStateStore.java | 1 + .../recovery/MemoryRMStateStore.java | 1 + .../resourcemanager/recovery/RMStateStore.java | 10 +- .../recovery/ZKRMStateStore.java | 2 + .../records/ApplicationAttemptStateData.java | 15 ++- .../pb/ApplicationAttemptStateDataPBImpl.java | 12 ++ .../server/resourcemanager/rmapp/RMAppImpl.java | 44 ++++++- .../rmapp/attempt/RMAppAttempt.java | 6 + .../rmapp/attempt/RMAppAttemptImpl.java | 27 +++- .../yarn_server_resourcemanager_recovery.proto | 1 + .../yarn/server/resourcemanager/MockRM.java | 23 +++- .../applicationsmanager/TestAMRestart.java | 127 +++++++++++++++++++ .../recovery/RMStateStoreTestBase.java | 6 +- 24 files changed, 372 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java deleted file mode 100644 index 198117b..0000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ControlledClock.java +++ /dev/null @@ -1,43 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.mapreduce.v2.app; - -import org.apache.hadoop.yarn.util.Clock; - -public class ControlledClock implements Clock { - private long time = -1; - private final Clock actualClock; - public ControlledClock(Clock actualClock) { - this.actualClock = actualClock; - } - public synchronized void setTime(long time) { - this.time = time; - } - public synchronized void reset() { - time = -1; - } - - @Override - public synchronized long getTime() { - if (time != -1) { - return time; - } - return actualClock.getTime(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 4aa11ed..1330344 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -34,7 +34,6 @@ import java.util.Iterator; import java.util.Map; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,7 +58,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; -import org.apache.hadoop.mapreduce.v2.app.ControlledClock; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -87,6 +85,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Test; import org.mockito.ArgumentCaptor; http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e554281..9664b8f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -42,10 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; -import org.apache.hadoop.mapreduce.v2.app.ControlledClock; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -110,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java index 1e07062..e2e943a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java @@ -22,6 +22,7 @@ package org.apache.hadoop.mapreduce.v2.hs; import java.io.File; import java.io.FileOutputStream; import java.util.UUID; + import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -30,12 +31,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.mapreduce.v2.app.ControlledClock; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.test.CoreTestDriver; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; import org.junit.AfterClass; http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index 530822f..00eb909 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -60,6 +60,12 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-nodemanager</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index 601904b..d2edd19 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -24,14 +24,12 @@ import java.util.Map; import java.util.Random; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; -import org.apache.hadoop.mapreduce.v2.app.ControlledClock; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -44,6 +42,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 065869a..9c1abf7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -75,9 +75,13 @@ Release 2.6.0 - UNRELEASED YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all containers to a preconfigured limit. (Varun Vasudev via vinodkv) - YARN-2033. YARN-2033. Merging generic-history into the Timeline Store + YARN-2033. Merging generic-history into the Timeline Store (Zhijie Shen via junping_du) + YARN-611. Added an API to let apps specify an interval beyond which AM + failures should be ignored towards counting max-attempts. (Xuan Gong via + vinodkv) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 1ee04f0..723a2e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -45,6 +45,15 @@ import java.util.Set; * {@link ContainerLaunchContext} of the container in which the * <code>ApplicationMaster</code> is executed. * </li> + * <li>maxAppAttempts. The maximum number of application attempts. + * It should be no larger than the global number of max attempts in the + * Yarn configuration.</li> + * <li>attemptFailuresValidityInterval. The default value is -1. + * when attemptFailuresValidityInterval in milliseconds is set to > 0, + * the failure number will no take failures which happen out of the + * validityInterval into failure count. If failure count reaches to + * maxAppAttempts, the application will be failed. + * </li> * </ul> * </p> * @@ -103,6 +112,22 @@ public abstract class ApplicationSubmissionContext { resource, null); } + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, long attemptFailuresValidityInterval) { + ApplicationSubmissionContext context = + newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers); + context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); + return context; + } + /** * Get the <code>ApplicationId</code> of the submitted application. * @return <code>ApplicationId</code> of the submitted application @@ -338,4 +363,22 @@ public abstract class ApplicationSubmissionContext { @Public @Stable public abstract void setApplicationTags(Set<String> tags); + + /** + * Get the attemptFailuresValidityInterval in milliseconds for the application + * + * @return the attemptFailuresValidityInterval + */ + @Public + @Stable + public abstract long getAttemptFailuresValidityInterval(); + + /** + * Set the attemptFailuresValidityInterval in milliseconds for the application + * @param attemptFailuresValidityInterval + */ + @Public + @Stable + public abstract void setAttemptFailuresValidityInterval( + long attemptFailuresValidityInterval); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d84de4a..d8c42cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -291,6 +291,7 @@ message ApplicationSubmissionContextProto { optional string applicationType = 10 [default = "YARN"]; optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; + optional int64 attempt_failures_validity_interval = 13 [default = -1]; } enum ApplicationAccessTypeProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index c2f3268..7b49a16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -402,4 +402,17 @@ extends ApplicationSubmissionContext { private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } + + @Override + public long getAttemptFailuresValidityInterval() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getAttemptFailuresValidityInterval(); + } + + @Override + public void setAttemptFailuresValidityInterval( + long attemptFailuresValidityInterval) { + maybeInitBuilder(); + builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java new file mode 100644 index 0000000..16bd785 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ControlledClock.java @@ -0,0 +1,42 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.util; + + +public class ControlledClock implements Clock { + private long time = -1; + private final Clock actualClock; + public ControlledClock(Clock actualClock) { + this.actualClock = actualClock; + } + public synchronized void setTime(long time) { + this.time = time; + } + public synchronized void reset() { + time = -1; + } + + @Override + public synchronized long getTime() { + if (time != -1) { + return time; + } + return actualClock.getTime(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 216659b..296f177 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -282,6 +282,7 @@ public class FileSystemRMStateStore extends RMStateStore { attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), attemptStateData.getAMContainerExitStatus(), + attemptStateData.getFinishTime(), attemptStateData.getMemorySeconds(), attemptStateData.getVcoreSeconds()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index f208c74..a67da2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -171,6 +171,7 @@ public class MemoryRMStateStore extends RMStateStore { attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), attemptStateData.getAMContainerExitStatus(), + attemptStateData.getFinishTime(), attemptStateData.getMemorySeconds(), attemptStateData.getVcoreSeconds()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index df4f3a9..973fe54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -261,6 +261,7 @@ public abstract class RMStateStore extends AbstractService { final Container masterContainer; final Credentials appAttemptCredentials; long startTime = 0; + long finishTime = 0; // fields set when attempt completes RMAppAttemptState state; String finalTrackingUrl = "N/A"; @@ -274,14 +275,15 @@ public abstract class RMStateStore extends AbstractService { Container masterContainer, Credentials appAttemptCredentials, long startTime, long memorySeconds, long vcoreSeconds) { this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID, memorySeconds, vcoreSeconds); + null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds); } public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime, RMAppAttemptState state, String finalTrackingUrl, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus, long memorySeconds, long vcoreSeconds) { + int exitStatus, long finishTime, long memorySeconds, + long vcoreSeconds) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; @@ -291,6 +293,7 @@ public abstract class RMStateStore extends AbstractService { this.diagnostics = diagnostics == null ? "" : diagnostics; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; this.exitStatus = exitStatus; + this.finishTime = finishTime; this.memorySeconds = memorySeconds; this.vcoreSeconds = vcoreSeconds; } @@ -328,6 +331,9 @@ public abstract class RMStateStore extends AbstractService { public long getVcoreSeconds() { return vcoreSeconds; } + public long getFinishTime() { + return this.finishTime; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 66fe988..6cdf4ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -604,9 +604,11 @@ public class ZKRMStateStore extends RMStateStore { attemptStateData.getDiagnostics(), attemptStateData.getFinalApplicationStatus(), attemptStateData.getAMContainerExitStatus(), + attemptStateData.getFinishTime(), attemptStateData.getMemorySeconds(), attemptStateData.getVcoreSeconds()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index ad8cdae..63ef8f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -44,7 +44,7 @@ public abstract class ApplicationAttemptStateData { ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, String finalTrackingUrl, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus, - long memorySeconds, long vcoreSeconds) { + long finishTime, long memorySeconds, long vcoreSeconds) { ApplicationAttemptStateData attemptStateData = Records.newRecord(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); @@ -56,6 +56,7 @@ public abstract class ApplicationAttemptStateData { attemptStateData.setStartTime(startTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); + attemptStateData.setFinishTime(finishTime); attemptStateData.setMemorySeconds(memorySeconds); attemptStateData.setVcoreSeconds(vcoreSeconds); return attemptStateData; @@ -75,7 +76,7 @@ public abstract class ApplicationAttemptStateData { attemptState.getStartTime(), attemptState.getState(), attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), attemptState.getFinalApplicationStatus(), - attemptState.getAMContainerExitStatus(), + attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(), attemptState.getMemorySeconds(), attemptState.getVcoreSeconds()); } @@ -163,7 +164,15 @@ public abstract class ApplicationAttemptStateData { public abstract void setAMContainerExitStatus(int exitStatus); /** - * Get the <em>memory seconds</em> (in MB seconds) of the application. + * Get the <em>finish time</em> of the application attempt. + * @return <em>finish time</em> of the application attempt + */ + public abstract long getFinishTime(); + + public abstract void setFinishTime(long finishTime); + + /** + * Get the <em>memory seconds</em> (in MB seconds) of the application. * @return <em>memory seconds</em> (in MB seconds) of the application */ @Public http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index 4d6212d..516af2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -318,4 +318,16 @@ public class ApplicationAttemptStateDataPBImpl extends private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { return ProtoUtils.convertFromProtoFormat(s); } + + @Override + public long getFinishTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getFinishTime(); + } + + @Override + public void setFinishTime(long finishTime) { + maybeInitBuilder(); + builder.setFinishTime(finishTime); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2dd2040..815b86a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,10 +32,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -81,8 +81,12 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { @@ -110,6 +114,12 @@ public class RMAppImpl implements RMApp, Recoverable { private final String applicationType; private final Set<String> applicationTags; + private final long attemptFailuresValidityInterval; + + private Clock systemClock; + + private boolean isNumAttemptsBeyondThreshold = false; + // Mutable fields private long startTime; private long finishTime = 0; @@ -328,6 +338,8 @@ public class RMAppImpl implements RMApp, Recoverable { ApplicationMasterService masterService, long submitTime, String applicationType, Set<String> applicationTags) { + this.systemClock = new SystemClock(); + this.applicationId = applicationId; this.name = name; this.rmContext = rmContext; @@ -340,7 +352,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.scheduler = scheduler; this.masterService = masterService; this.submitTime = submitTime; - this.startTime = System.currentTimeMillis(); + this.startTime = this.systemClock.getTime(); this.applicationType = applicationType; this.applicationTags = applicationTags; @@ -358,6 +370,9 @@ public class RMAppImpl implements RMApp, Recoverable { this.maxAppAttempts = individualMaxAppAttempts; } + this.attemptFailuresValidityInterval = + submissionContext.getAttemptFailuresValidityInterval(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -897,7 +912,7 @@ public class RMAppImpl implements RMApp, Recoverable { msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { + } else if (this.isNumAttemptsBeyondThreshold) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -930,7 +945,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppState stateToBeStored) { rememberTargetTransitions(event, transitionToDo, targetFinalState); this.stateBeforeFinalSaving = getState(); - this.storedFinishTime = System.currentTimeMillis(); + this.storedFinishTime = this.systemClock.getTime(); LOG.info("Updating application " + this.applicationId + " with final state: " + this.targetedFinalState); @@ -1097,7 +1112,7 @@ public class RMAppImpl implements RMApp, Recoverable { } app.finishTime = app.storedFinishTime; if (app.finishTime == 0 ) { - app.finishTime = System.currentTimeMillis(); + app.finishTime = app.systemClock.getTime(); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. @@ -1118,11 +1133,16 @@ public class RMAppImpl implements RMApp, Recoverable { private int getNumFailedAppAttempts() { int completedAttempts = 0; + long endTime = this.systemClock.getTime(); // Do not count AM preemption, hardware failures or NM resync // as attempt failure. for (RMAppAttempt attempt : attempts.values()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) { - completedAttempts++; + if (this.attemptFailuresValidityInterval <= 0 + || (attempt.getFinishTime() > endTime + - this.attemptFailuresValidityInterval)) { + completedAttempts++; + } } } return completedAttempts; @@ -1139,8 +1159,9 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + int numberOfFailure = app.getNumFailedAppAttempts(); if (!app.submissionContext.getUnmanagedAM() - && app.getNumFailedAppAttempts() < app.maxAppAttempts) { + && numberOfFailure < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = @@ -1158,6 +1179,9 @@ public class RMAppImpl implements RMApp, Recoverable { } return initialState; } else { + if (numberOfFailure >= app.maxAppAttempts) { + app.isNumAttemptsBeyondThreshold = true; + } app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED); @@ -1244,4 +1268,10 @@ public class RMAppImpl implements RMApp, Recoverable { numNonAMContainerPreempted, numAMContainerPreempted, memorySeconds, vcoreSeconds); } + + @Private + @VisibleForTesting + public void setSystemClock(Clock clock) { + this.systemClock = clock; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..943a5e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -213,4 +213,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> { * @return metrics */ RMAppAttemptMetrics getRMAppAttemptMetrics(); + + /** + * the finish time of the application attempt. + * @return the finish time of the application attempt. + */ + long getFinishTime(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1489696..863130f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -85,7 +84,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -142,6 +140,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private String originalTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private long finishTime = 0; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -739,6 +738,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.finalStatus = attemptState.getFinalApplicationStatus(); this.startTime = attemptState.getStartTime(); + this.finishTime = attemptState.getFinishTime(); this.attemptMetrics.updateAggregateAppResourceUsage( attemptState.getMemorySeconds(),attemptState.getVcoreSeconds()); } @@ -1028,11 +1028,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { AggregateAppResourceUsage resUsage = this.attemptMetrics.getAggregateAppResourceUsage(); RMStateStore rmStore = rmContext.getStateStore(); + setFinishTime(System.currentTimeMillis()); ApplicationAttemptState attemptState = new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus, - resUsage.getMemorySeconds(), resUsage.getVcoreSeconds()); + getFinishTime(), resUsage.getMemorySeconds(), + resUsage.getVcoreSeconds()); LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState + ", and exit status: " + exitStatus); @@ -1747,4 +1749,23 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // lock return attemptMetrics; } + + @Override + public long getFinishTime() { + try { + this.readLock.lock(); + return this.finishTime; + } finally { + this.readLock.unlock(); + } + } + + private void setFinishTime(long finishTime) { + try { + this.writeLock.lock(); + this.finishTime = finishTime; + } finally { + this.writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 5125a27..4d29153 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -80,6 +80,7 @@ message ApplicationAttemptStateDataProto { optional int32 am_container_exit_status = 9 [default = -1000]; optional int64 memory_seconds = 10; optional int64 vcore_seconds = 11; + optional int64 finish_time = 12; } message EpochProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3817637..1338a6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -278,7 +278,16 @@ public class MockRM extends ResourceManager { boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null); + false, null, 0); + } + + public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) + throws Exception { + return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + false, null, attemptFailuresValidityInterval); } public RMApp submitApp(int masterMemory, String name, String user, @@ -286,6 +295,17 @@ public class MockRM extends ResourceManager { int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, 0); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval) + throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -321,6 +341,7 @@ public class MockRM extends ResourceManager { clc.setTokens(securityTokens); } sub.setAMContainerSpec(clc); + sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c5c818..fcb4e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -53,7 +54,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Assert; import org.junit.Test; @@ -584,4 +587,128 @@ public class TestAMRestart { rm1.stop(); rm2.stop(); } + + @Test (timeout = 50000) + public void testRMAppAttemptFailuresValidityInterval() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + // explicitly set max-am-retry count as 2. + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // set window size to a larger number : 20s + // we will verify the app should be failed if + // two continuous attempts failed in 20s. + RMApp app = rm1.submitApp(200, 20000); + + MockAM am = MockRM.launchAM(app, rm1, nm1); + // Fail current attempt normally + nm1.nodeHeartbeat(am.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FAILED); + // launch the second attempt + rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app.getAppAttempts().size()); + Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt()) + .mayBeLastAttempt()); + MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1); + am_2.waitForState(RMAppAttemptState.RUNNING); + nm1.nodeHeartbeat(am_2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am_2.waitForState(RMAppAttemptState.FAILED); + // current app should be failed. + rm1.waitForState(app.getApplicationId(), RMAppState.FAILED); + + ControlledClock clock = new ControlledClock(new SystemClock()); + // set window size to 6s + RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 6000);; + app1.setSystemClock(clock); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Fail attempt1 normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + + // launch the second attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, app1.getAppAttempts().size()); + + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt()); + MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am2.waitForState(RMAppAttemptState.RUNNING); + + // wait for 6 seconds + clock.setTime(System.currentTimeMillis() + 6*1000); + // Fail attempt2 normally + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), + 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + + // can launch the third attempt successfully + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(3, app1.getAppAttempts().size()); + RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); + clock.reset(); + MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + am3.waitForState(RMAppAttemptState.RUNNING); + + // Restart rm. + @SuppressWarnings("resource") + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // re-register the NM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus status = Records.newRecord(NMContainerStatus.class); + status + .setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + status.setContainerId(attempt3.getMasterContainer().getId()); + status.setContainerState(ContainerState.COMPLETE); + status.setDiagnostics(""); + nm1.registerNode(Collections.singletonList(status), null); + + rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); + + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // Lauch Attempt 4 + MockAM am4 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); + + // wait for 6 seconds + clock.setTime(System.currentTimeMillis() + 6*1000); + // Fail attempt4 normally + nm1 + .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am4.waitForState(RMAppAttemptState.FAILED); + + // can launch the 5th attempt successfully + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + MockAM am5 = + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1); + clock.reset(); + am5.waitForState(RMAppAttemptState.RUNNING); + + // Fail attempt5 normally + nm1 + .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am5.waitForState(RMAppAttemptState.FAILED); + + rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); + rm1.stop(); + rm2.stop(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/14e2639f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index a0ddc85..85022d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -318,7 +318,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 100, 0, 0); + FinalApplicationStatus.SUCCEEDED, 100, + oldAttemptState.getFinishTime(), 0, 0); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -341,7 +342,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 111, 0, 0); + FinalApplicationStatus.SUCCEEDED, 111, + oldAttemptState.getFinishTime(), 0, 0); store.updateApplicationAttemptState(dummyAttempt); // let things settle down