Author: vinodkv
Date: Wed Mar 12 21:18:55 2014
New Revision: 1576911
URL: http://svn.apache.org/r1576911
Log:
YARN-1816. Fixed ResourceManager to get RMApp correctly handle ATTEMPT_FINISHED
event at ACCEPTED state that can happen after RM restarts. Contributed by Jian
He.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1576911&r1=1576910&r2=1576911&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Mar 12 21:18:55 2014
@@ -457,6 +457,10 @@ Release 2.4.0 - UNRELEASED
and thus recover app itself synchronously and avoid races with resyncing
NodeManagers. (Jian He via vinodkv)
+ YARN-1816. Fixed ResourceManager to get RMApp correctly handle
+ ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts.
+ (Jian He via vinodkv)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1576911&r1=1576910&r2=1576911&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
Wed Mar 12 21:18:55 2014
@@ -189,11 +189,14 @@ public class RMAppImpl implements RMApp,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
- // ACCEPTED state is possible to receive ATTEMPT_FAILED event because
- // RMAppRecoveredTransition is returning ACCEPTED state directly and
- // waiting for the previous AM to exit.
+ // ACCEPTED state is possible to receive
ATTEMPT_FAILED/ATTEMPT_FINISHED
+ // event because RMAppRecoveredTransition is returning ACCEPTED state
+ // directly and waiting for the previous AM to exit.
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.ACCEPTED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_FINISHED,
+ new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
@@ -725,11 +728,7 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- /*
- * If last attempt recovered final state is null .. it means attempt was
- * started but AM container may or may not have started / finished.
- * Therefore we should wait for it to finish.
- */
+
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
// synchronously recover attempt to ensure any incoming external events
// to be processed after the attempt processes the recover event.
@@ -744,6 +743,17 @@ public class RMAppImpl implements RMApp,
return app.recoveredFinalState;
}
+ // Last attempt is in final state, do not add to scheduler and just
return
+ // ACCEPTED waiting for last RMAppAttempt to send finished or failed
event
+ // back.
+ if (app.currentAttempt != null
+ && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
+ || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
+ || (app.currentAttempt.getState() == RMAppAttemptState.FAILED
+ && app.attempts.size() == app.maxAppAttempts))) {
+ return RMAppState.ACCEPTED;
+ }
+
// Notify scheduler about the app on recovery
new AddApplicationToSchedulerTransition().transition(app, event);
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1576911&r1=1576910&r2=1576911&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
Wed Mar 12 21:18:55 2014
@@ -872,6 +872,11 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
+ /*
+ * If last attempt recovered final state is null .. it means attempt was
+ * started but AM container may or may not have started / finished.
+ * Therefore we should wait for it to finish.
+ */
if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
@@ -1598,7 +1603,7 @@ public class RMAppAttemptImpl implements
ExitUtil.terminate(1, storeEvent.getStoredException());
}
}
-
+
private void storeAttempt() {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1576911&r1=1576910&r2=1576911&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Wed Mar 12 21:18:55 2014
@@ -601,7 +601,63 @@ public class TestRMRestart {
RMAppAttemptState.SCHEDULED);
Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
.getCurrentAppAttempt().getAppAttemptState());
+ }
+
+ // Test RM restarts after previous attempt succeeded and was saved into state
+ // store but before the RMAppAttempt notifies RMApp that it has succeeded. On
+ // recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
+ // that RMApp can recover its state.
+ @Test
+ public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ int count = 0;
+
+ @Override
+ public void updateApplicationStateInternal(ApplicationId appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception {
+ if (count == 0) {
+ // do nothing; simulate app final state is not saved.
+ LOG.info(appId + " final state is not saved.");
+ count++;
+ } else {
+ super.updateApplicationStateInternal(appId, appStateData);
+ }
+ }
+ };
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+ FinishApplicationMasterRequest req =
+ FinishApplicationMasterRequest.newInstance(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ am0.unregisterAppAttempt(req, true);
+ am0.waitForState(RMAppAttemptState.FINISHING);
+ // app final state is not saved. This guarantees that RMApp cannot be
+ // recovered via its own saved state, but only via the event notification
+ // from the RMAppAttempt on recovery.
+ Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
+
+ // start RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ rm2.start();
+
+ rm2.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.FINISHED);
+ rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+ // app final state is saved via the finish event from attempt.
+ Assert.assertEquals(RMAppState.FINISHED,
+ rmAppState.get(app0.getApplicationId()).getState());
}
@Test