Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -64,6 +66,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -115,6 +118,7 @@ public class RMAppImpl implements RMApp, private EventHandler handler; private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); + private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -162,6 +166,8 @@ public class RMAppImpl implements RMApp, RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.MOVE, new RMAppMoveTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, @@ -179,7 +185,6 @@ public class RMAppImpl implements RMApp, new FinalSavingTransition( new AppKilledTransition(), RMAppState.KILLED)) - // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) @@ -199,12 +204,9 @@ public class RMAppImpl implements RMApp, new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -219,6 +221,9 @@ public class RMAppImpl implements RMApp, .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, // UnManagedAM directly jumps to finished RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.RUNNING, EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, @@ -234,22 +239,31 @@ public class RMAppImpl implements RMApp, .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_FINISHED, new AttemptFinishedAtFinalSavingTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, - RMAppEventType.APP_NEW_SAVED)) + RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, - // ignore Kill as we have already saved the final Finished state in - // state store. - RMAppEventType.KILL)) + // ignore Kill/Move as we have already saved the final Finished state + // in state store. + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from KILLING state + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -262,42 +276,47 @@ public class RMAppImpl implements RMApp, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.APP_UPDATE_SAVED, - RMAppEventType.KILL)) + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from FINISHED state // ignorable transitions + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, EnumSet.of( RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL)) + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from FAILED state // ignorable transitions + .addTransition(RMAppState.FAILED, RMAppState.FAILED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, + RMAppEventType.MOVE)) // Transitions from KILLED state // ignorable transitions + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition( RMAppState.KILLED, RMAppState.KILLED, EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE)) + RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) .installTopology(); private final StateMachine<RMAppState, RMAppEventType, RMAppEvent> stateMachine; - private static final ApplicationResourceUsageReport - DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = - BuilderUtils.newApplicationResourceUsageReport(-1, -1, - Resources.createResource(-1, -1), Resources.createResource(-1, -1), - Resources.createResource(-1, -1)); private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1; public RMAppImpl(ApplicationId applicationId, RMContext rmContext, @@ -498,7 +517,7 @@ public class RMAppImpl implements RMApp, String origTrackingUrl = UNAVAILABLE; int rpcPort = -1; ApplicationResourceUsageReport appUsageReport = - DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; + RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; FinalApplicationStatus finishState = getFinalApplicationStatus(); String diags = UNAVAILABLE; float progress = 0.0f; @@ -666,7 +685,12 @@ public class RMAppImpl implements RMApp, ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, maxAppAttempts == attempts.size()); + submissionContext, conf, + // The newly created attempt maybe last attempt if (number of + // previously failed attempts(which should not include Preempted, + // hardware error and NM resync) + 1) equal to the max-attempt + // limit. + maxAppAttempts == (getNumFailedAppAttempts() + 1)); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -699,6 +723,23 @@ public class RMAppImpl implements RMApp, nodeUpdateEvent.getNode()); }; } + + private static final class AppRunningOnNodeTransition extends RMAppTransition { + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; + + // if final state already stored, notify RMNode + if (isAppInFinalState(app)) { + app.handler.handle( + new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent + .getApplicationId())); + return; + } + + // otherwise, add it to ranNodes for further process + app.ranNodes.add(nodeAddedEvent.getNodeId()); + }; + } /** * Move an app to a new queue. @@ -723,46 +764,54 @@ public class RMAppImpl implements RMApp, } } + // synchronously recover attempt to ensure any incoming external events + // to be processed after the attempt processes the recover event. + private void recoverAppAttempts() { + for (RMAppAttempt attempt : getAppAttempts().values()) { + attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + } + private static final class RMAppRecoveredTransition implements MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - for (RMAppAttempt attempt : app.getAppAttempts().values()) { - // synchronously recover attempt to ensure any incoming external events - // to be processed after the attempt processes the recover event. - attempt.handle( - new RMAppAttemptEvent(attempt.getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); - } - // The app has completed. if (app.recoveredFinalState != null) { + app.recoverAppAttempts(); new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; } - // Last attempt is in final state, do not add to scheduler and just return - // ACCEPTED waiting for last RMAppAttempt to send finished or failed event - // back. + // No existent attempts means the attempt associated with this app was not + // started or started but not yet saved. + if (app.attempts.isEmpty()) { + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user)); + return RMAppState.SUBMITTED; + } + + // Add application to scheduler synchronously to guarantee scheduler + // knows applications before AM or NM re-registers. + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user, true)); + + // recover attempts + app.recoverAppAttempts(); + + // Last attempt is in final state, return ACCEPTED waiting for last + // RMAppAttempt to send finished or failed event back. if (app.currentAttempt != null && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED || (app.currentAttempt.getState() == RMAppAttemptState.FAILED - && app.attempts.size() == app.maxAppAttempts))) { + && app.getNumFailedAppAttempts() == app.maxAppAttempts))) { return RMAppState.ACCEPTED; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); - - // No existent attempts means the attempt associated with this app was not - // started or started but not yet saved. - if (app.attempts.isEmpty()) { - return RMAppState.SUBMITTED; - } - // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. @@ -774,17 +823,6 @@ public class RMAppImpl implements RMApp, RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - if (event instanceof RMAppNewSavedEvent) { - RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event; - // For HA this exception needs to be handled by giving up - // master status if we got fenced - if (((RMAppNewSavedEvent) event).getStoredException() != null) { - LOG.error( - "Failed to store application: " + storeEvent.getApplicationId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } - } app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user)); } @@ -802,13 +840,6 @@ public class RMAppImpl implements RMApp, @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event; - if (storeEvent.getUpdatedException() != null) { - LOG.error("Failed to update the final state of application" - + storeEvent.getApplicationId(), storeEvent.getUpdatedException()); - ExitUtil.terminate(1, storeEvent.getUpdatedException()); - } - if (app.transitionTodo instanceof SingleArcTransition) { ((SingleArcTransition) app.transitionTodo).transition(app, app.eventCausingFinalSaving); @@ -844,7 +875,7 @@ public class RMAppImpl implements RMApp, msg = "Unmanaged application " + this.getApplicationId() + " failed due to " + failedEvent.getDiagnostics() + ". Failing the application."; - } else if (this.attempts.size() >= this.maxAppAttempts) { + } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() + ". Failing the application."; @@ -1037,17 +1068,8 @@ public class RMAppImpl implements RMApp, this.finalState = finalState; } - private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) { - Set<NodeId> nodes = new HashSet<NodeId>(); - for (RMAppAttempt attempt : app.attempts.values()) { - nodes.addAll(attempt.getRanNodes()); - } - return nodes; - } - public void transition(RMAppImpl app, RMAppEvent event) { - Set<NodeId> nodes = getNodesOnWhichAttemptRan(app); - for (NodeId nodeId : nodes) { + for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); } @@ -1055,8 +1077,12 @@ public class RMAppImpl implements RMApp, if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + // Recovered apps that are completed were not added to scheduler, so no + // need to remove them from scheduler. + if (app.recoveredFinalState == null) { + app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, + finalState)); + } app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); @@ -1066,6 +1092,18 @@ public class RMAppImpl implements RMApp, }; } + private int getNumFailedAppAttempts() { + int completedAttempts = 0; + // Do not count AM preemption, hardware failures or NM resync + // as attempt failure. + for (RMAppAttempt attempt : attempts.values()) { + if (attempt.shouldCountTowardsMaxAttemptRetry()) { + completedAttempts++; + } + } + return completedAttempts; + } + private static final class AttemptFailedTransition implements MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { @@ -1077,8 +1115,9 @@ public class RMAppImpl implements RMApp, @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + if (!app.submissionContext.getUnmanagedAM() - && app.attempts.size() < app.maxAppAttempts) { + && app.getNumFailedAppAttempts() < app.maxAppAttempts) { boolean transferStateFromPreviousAttempt = false; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = @@ -1137,6 +1176,9 @@ public class RMAppImpl implements RMApp, public static boolean isAppInFinalState(RMApp rmApp) { RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState(); + if (appState == null) { + appState = rmApp.getState(); + } return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } @@ -1144,4 +1186,30 @@ public class RMAppImpl implements RMApp, private RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } + + @Override + public Set<NodeId> getRanNodes() { + return ranNodes; + } + + @Override + public RMAppMetrics getRMAppMetrics() { + Resource resourcePreempted = Resource.newInstance(0, 0); + int numAMContainerPreempted = 0; + int numNonAMContainerPreempted = 0; + for (RMAppAttempt attempt : attempts.values()) { + if (null != attempt) { + RMAppAttemptMetrics attemptMetrics = + attempt.getRMAppAttemptMetrics(); + Resources.addTo(resourcePreempted, + attemptMetrics.getResourcePreempted()); + numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0; + numNonAMContainerPreempted += + attemptMetrics.getNumNonAMContainersPreempted(); + } + } + + return new RMAppMetrics(resourcePreempted, + numNonAMContainerPreempted, numAMContainerPreempted); + } }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Tue Aug 19 23:49:39 2014 @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; -import java.util.Set; import javax.crypto.SecretKey; @@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -115,12 +113,6 @@ public interface RMAppAttempt extends Ev FinalApplicationStatus getFinalApplicationStatus(); /** - * Nodes on which the containers for this {@link RMAppAttempt} ran. - * @return the set of nodes that ran any containers from this {@link RMAppAttempt} - */ - Set<NodeId> getRanNodes(); - - /** * Return a list of the last set of finished containers, resetting the * finished containers to empty. * @return the list of just finished containers, re setting the finished containers. @@ -204,4 +196,21 @@ public interface RMAppAttempt extends Ev */ ApplicationAttemptReport createApplicationAttemptReport(); + /** + * Return the flag which indicates whether the attempt failure should be + * counted to attempt retry count. + * <ul> + * There failure types should not be counted to attempt retry count: + * <li>preempted by the scheduler.</li> + * <li>hardware failures, such as NM failing, lost NM and NM disk errors.</li> + * <li>killed by RM because of RM restart or failover.</li> + * </ul> + */ + boolean shouldCountTowardsMaxAttemptRetry(); + + /** + * Get metrics from the {@link RMAppAttempt} + * @return metrics + */ + RMAppAttemptMetrics getRMAppAttemptMetrics(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Tue Aug 19 23:49:39 2014 @@ -36,7 +36,6 @@ public enum RMAppAttemptEventType { UNREGISTERED, // Source: Containers - CONTAINER_ACQUIRED, CONTAINER_ALLOCATED, CONTAINER_FINISHED, Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Aug 19 23:49:39 2014 @@ -26,20 +26,18 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -51,12 +49,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.EventHandler; @@ -80,15 +77,13 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings({"unchecked", "rawtypes"}) public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements private final ApplicationSubmissionContext submissionContext; private Token<AMRMTokenIdentifier> amrmToken = null; private SecretKey clientTokenMasterKey = null; - - //nodes on while this attempt's containers ran - private Set<NodeId> ranNodes = - new HashSet<NodeId>(); + private List<ContainerStatus> justFinishedContainers = new ArrayList<ContainerStatus>(); private Container masterContainer; @@ -152,9 +146,15 @@ public class RMAppAttemptImpl implements // if an RMAppAttemptUnregistrationEvent occurs private FinalApplicationStatus finalStatus = null; private final StringBuilder diagnostics = new StringBuilder(); + private int amContainerExitStatus = ContainerExitStatus.INVALID; private Configuration conf; - private final boolean isLastAttempt; + // Since AM preemption, hardware error and NM resync are not counted towards + // AM failure count, even if this flag is true, a new attempt can still be + // re-created if this attempt is eventually failed because of preemption, + // hardware error or NM resync. So this flag indicates that this may be + // last attempt. + private final boolean maybeLastAttempt; private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); @@ -163,6 +163,8 @@ public class RMAppAttemptImpl implements private RMAppAttemptState recoveredFinalState; private RMAppAttemptState stateBeforeFinalSaving; private Object transitionTodo; + + private RMAppAttemptMetrics attemptMetrics = null; private static final StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, @@ -214,21 +216,30 @@ public class RMAppAttemptImpl implements RMAppAttemptEventType.KILL, new FinalSavingTransition(new BaseFinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) - + .addTransition(RMAppAttemptState.SCHEDULED, + RMAppAttemptState.FINAL_SAVING, + RMAppAttemptEventType.CONTAINER_FINISHED, + new FinalSavingTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.FAILED)) + // Transitions from ALLOCATED_SAVING State .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) - .addTransition(RMAppAttemptState.ALLOCATED_SAVING, - RMAppAttemptState.ALLOCATED_SAVING, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) + // App could be killed by the client. So need to handle this. .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.KILL, new FinalSavingTransition(new BaseFinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.FINAL_SAVING, + RMAppAttemptEventType.CONTAINER_FINISHED, + new FinalSavingTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.FAILED)) // Transitions from LAUNCHED_UNMANAGED_SAVING State .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, @@ -249,10 +260,6 @@ public class RMAppAttemptImpl implements RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) // Transitions from ALLOCATED State - .addTransition(RMAppAttemptState.ALLOCATED, - RMAppAttemptState.ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED, RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, @@ -267,15 +274,17 @@ public class RMAppAttemptImpl implements .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.CONTAINER_FINISHED, new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED)) // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) - .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new ContainerFinishedTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.LAUNCHED)) .addTransition( RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -295,14 +304,12 @@ public class RMAppAttemptImpl implements .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, RMAppAttemptEventType.CONTAINER_ALLOCATED) .addTransition( - RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) - .addTransition( RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new ContainerFinishedTransition()) + new ContainerFinishedTransition( + new AMContainerCrashedAtRunningTransition(), + RMAppAttemptState.RUNNING)) .addTransition( RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -333,7 +340,6 @@ public class RMAppAttemptImpl implements // should be fixed to reject container allocate request at Final // Saving in scheduler RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, RMAppAttemptEventType.KILL)) @@ -343,7 +349,7 @@ public class RMAppAttemptImpl implements // use by the next new attempt. .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED, RMAppAttemptEventType.CONTAINER_FINISHED, - new ContainerFinishedAtFailedTransition()) + new ContainerFinishedAtFinalStateTransition()) .addTransition( RMAppAttemptState.FAILED, RMAppAttemptState.FAILED, @@ -379,31 +385,36 @@ public class RMAppAttemptImpl implements RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_FINISHED, RMAppAttemptEventType.KILL)) + .addTransition(RMAppAttemptState.FINISHED, + RMAppAttemptState.FINISHED, + RMAppAttemptEventType.CONTAINER_FINISHED, + new ContainerFinishedAtFinalStateTransition()) // Transitions from KILLED State .addTransition( RMAppAttemptState.KILLED, RMAppAttemptState.KILLED, EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED, - RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.LAUNCHED, RMAppAttemptEventType.LAUNCH_FAILED, RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.REGISTERED, RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_FINISHED, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, RMAppAttemptEventType.STATUS_UPDATE)) + .addTransition(RMAppAttemptState.KILLED, + RMAppAttemptState.KILLED, + RMAppAttemptEventType.CONTAINER_FINISHED, + new ContainerFinishedAtFinalStateTransition()) .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, boolean isLastAttempt) { + Configuration conf, boolean maybeLastAttempt) { this.conf = conf; this.applicationAttemptId = appAttemptId; this.rmContext = rmContext; @@ -417,8 +428,9 @@ public class RMAppAttemptImpl implements this.writeLock = lock.writeLock(); this.proxiedTrackingUrl = generateProxyUriWithScheme(null); - this.isLastAttempt = isLastAttempt; + this.maybeLastAttempt = maybeLastAttempt; this.stateMachine = stateMachineFactory.make(this); + this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId); } @Override @@ -526,7 +538,7 @@ public class RMAppAttemptImpl implements private void setTrackingUrlToRMAppPage() { originalTrackingUrl = pjoin( - WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf), + WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app", getAppAttemptId().getApplicationId()); proxiedTrackingUrl = originalTrackingUrl; } @@ -545,7 +557,22 @@ public class RMAppAttemptImpl implements @Override public Token<AMRMTokenIdentifier> getAMRMToken() { - return this.amrmToken; + this.readLock.lock(); + try { + return this.amrmToken; + } finally { + this.readLock.unlock(); + } + } + + @Private + public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) { + this.writeLock.lock(); + try { + this.amrmToken = lastToken; + } finally { + this.writeLock.unlock(); + } } @Override @@ -579,6 +606,15 @@ public class RMAppAttemptImpl implements } } + public int getAMContainerExitStatus() { + this.readLock.lock(); + try { + return this.amContainerExitStatus; + } finally { + this.readLock.unlock(); + } + } + @Override public float getProgress() { this.readLock.lock(); @@ -616,11 +652,6 @@ public class RMAppAttemptImpl implements } @Override - public Set<NodeId> getRanNodes() { - return ranNodes; - } - - @Override public Container getMasterContainer() { this.readLock.lock(); @@ -671,9 +702,7 @@ public class RMAppAttemptImpl implements ApplicationResourceUsageReport report = scheduler.getAppResourceUsageReport(this.getAppAttemptId()); if (report == null) { - Resource none = Resource.newInstance(0, 0); - report = ApplicationResourceUsageReport.newInstance(0, 0, none, none, - none); + report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; } return report; } finally { @@ -692,8 +721,13 @@ public class RMAppAttemptImpl implements + attemptState.getState()); diagnostics.append("Attempt recovered after RM restart"); diagnostics.append(attemptState.getDiagnostics()); + this.amContainerExitStatus = attemptState.getAMContainerExitStatus(); + if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) { + this.attemptMetrics.setIsPreempted(); + } setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials()); + recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), + attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -703,12 +737,13 @@ public class RMAppAttemptImpl implements public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainers(); - this.ranNodes = attempt.getRanNodes(); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens) - throws IOException { - if (appAttemptTokens == null) { + private void recoverAppAttemptCredentials(Credentials appAttemptTokens, + RMAppAttemptState state) throws IOException { + if (appAttemptTokens == null || state == RMAppAttemptState.FAILED + || state == RMAppAttemptState.FINISHED + || state == RMAppAttemptState.KILLED) { return; } @@ -719,12 +754,9 @@ public class RMAppAttemptImpl implements .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); } - // Only one AMRMToken is stored per-attempt, so this should be fine. Can't - // use TokenSelector as service may change - think fail-over. this.amrmToken = - (Token<AMRMTokenIdentifier>) appAttemptTokens - .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); - rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId); } private static class BaseTransition implements @@ -760,13 +792,6 @@ public class RMAppAttemptImpl implements .createMasterKey(appAttempt.applicationAttemptId); } - // create AMRMToken - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(appAttempt.applicationAttemptId); - appAttempt.amrmToken = - new Token<AMRMTokenIdentifier>(id, - appAttempt.rmContext.getAMRMTokenSecretManager()); - // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( @@ -838,7 +863,10 @@ public class RMAppAttemptImpl implements // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers() - .get(0)); + .get(0)); + RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler + .getRMContainer(appAttempt.getMasterContainer().getId()); + rmMasterContainer.setAMContainer(true); // The node set in NMTokenSecrentManager is used for marking whether the // NMToken has been issued for this node to the AM. // When AM container was allocated to RM itself, the node which allocates @@ -875,7 +903,6 @@ public class RMAppAttemptImpl implements @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.checkAttemptStoreError(event); appAttempt.launchAttempt(); } } @@ -904,6 +931,18 @@ public class RMAppAttemptImpl implements } return appAttempt.recoveredFinalState; } else { + // Add the current attempt to the scheduler. + if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + + // Add attempt to scheduler synchronously to guarantee scheduler + // knows attempts before AM or NM re-registers. + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false, true)); + } + /* * Since the application attempt's final state is not saved that means * for AM container (previous attempt) state must be one of these. @@ -947,7 +986,7 @@ public class RMAppAttemptImpl implements String diags = null; String finalTrackingUrl = null; FinalApplicationStatus finalStatus = null; - + int exitStatus = ContainerExitStatus.INVALID; switch (event.getType()) { case LAUNCH_FAILED: RMAppAttemptLaunchFailedEvent launchFaileEvent = @@ -968,6 +1007,7 @@ public class RMAppAttemptImpl implements RMAppAttemptContainerFinishedEvent finishEvent = (RMAppAttemptContainerFinishedEvent) event; diags = getAMContainerCrashedDiagnostics(finishEvent); + exitStatus = finishEvent.getContainerStatus().getExitStatus(); break; case KILL: break; @@ -982,9 +1022,10 @@ public class RMAppAttemptImpl implements ApplicationAttemptState attemptState = new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus); + stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus); LOG.info("Updating application attempt " + applicationAttemptId - + " with final state: " + targetedFinalState); + + " with final state: " + targetedFinalState + ", and exit status: " + + exitStatus); rmStore.updateApplicationAttemptState(attemptState); } @@ -1013,14 +1054,6 @@ public class RMAppAttemptImpl implements @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event; - if (storeEvent.getUpdatedException() != null) { - LOG.error("Failed to update the final state of application attempt: " - + storeEvent.getApplicationAttemptId(), - storeEvent.getUpdatedException()); - ExitUtil.terminate(1, storeEvent.getUpdatedException()); - } - RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; if (appAttempt.transitionTodo instanceof SingleArcTransition) { @@ -1077,11 +1110,20 @@ public class RMAppAttemptImpl implements // don't leave the tracking URL pointing to a non-existent AM appAttempt.setTrackingUrlToRMAppPage(); appAttempt.invalidateAMHostAndPort(); + if (appAttempt.submissionContext .getKeepContainersAcrossApplicationAttempts() - && !appAttempt.isLastAttempt && !appAttempt.submissionContext.getUnmanagedAM()) { - keepContainersAcrossAppAttempts = true; + // See if we should retain containers for non-unmanaged applications + if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) { + // Premption, hardware failures, NM resync doesn't count towards + // app-failures and so we should retain containers. + keepContainersAcrossAppAttempts = true; + } else if (!appAttempt.maybeLastAttempt) { + // Not preemption, hardware failures or NM resync. + // Not last-attempt too - keep containers. + keepContainersAcrossAppAttempts = true; + } } appEvent = new RMAppFailedAttemptEvent(applicationId, @@ -1121,18 +1163,31 @@ public class RMAppAttemptImpl implements appAttempt.getClientTokenMasterKey()); } } - + + @Override + public boolean shouldCountTowardsMaxAttemptRetry() { + try { + this.readLock.lock(); + int exitStatus = getAMContainerExitStatus(); + return !(exitStatus == ContainerExitStatus.PREEMPTED + || exitStatus == ContainerExitStatus.ABORTED + || exitStatus == ContainerExitStatus.DISKS_FAILED + || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); + } finally { + this.readLock.unlock(); + } + } + private static final class UnmanagedAMAttemptSavedTransition extends AMLaunchedTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.checkAttemptStoreError(event); - // TODO Today unmanaged AM client is waiting for app state to be Accepted to - // launch the AM. This is broken since we changed to start the attempt - // after the application is Accepted. We may need to introduce an attempt - // report that client can rely on to query the attempt state and choose to - // launch the unmanaged AM. + // create AMRMToken + appAttempt.amrmToken = + appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.applicationAttemptId); + super.transition(appAttempt, event); } } @@ -1207,17 +1262,16 @@ public class RMAppAttemptImpl implements } } - private static final class AMContainerCrashedTransition extends + private static final class AMContainerCrashedBeforeRunningTransition extends BaseFinalTransition { - public AMContainerCrashedTransition() { + public AMContainerCrashedBeforeRunningTransition() { super(RMAppAttemptState.FAILED); } @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent finishEvent = ((RMAppAttemptContainerFinishedEvent)event); @@ -1225,23 +1279,39 @@ public class RMAppAttemptImpl implements appAttempt.rmContext.getAMLivelinessMonitor().unregister( appAttempt.getAppAttemptId()); - // Setup diagnostic message - appAttempt.diagnostics - .append(getAMContainerCrashedDiagnostics(finishEvent)); + // Setup diagnostic message and exit status + appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent); + // Tell the app, scheduler super.transition(appAttempt, finishEvent); } } - private static String getAMContainerCrashedDiagnostics( + private void setAMContainerCrashedDiagnosticsAndExitStatus( + RMAppAttemptContainerFinishedEvent finishEvent) { + ContainerStatus status = finishEvent.getContainerStatus(); + String diagnostics = getAMContainerCrashedDiagnostics(finishEvent); + this.diagnostics.append(diagnostics); + this.amContainerExitStatus = status.getExitStatus(); + } + + private String getAMContainerCrashedDiagnostics( RMAppAttemptContainerFinishedEvent finishEvent) { ContainerStatus status = finishEvent.getContainerStatus(); - String diagnostics = - "AM Container for " + finishEvent.getApplicationAttemptId() - + " exited with " + " exitCode: " + status.getExitStatus() - + " due to: " + status.getDiagnostics() + "." - + "Failing this attempt."; - return diagnostics; + StringBuilder diagnosticsBuilder = new StringBuilder(); + diagnosticsBuilder.append("AM Container for ").append( + finishEvent.getApplicationAttemptId()).append( + " exited with ").append(" exitCode: ").append(status.getExitStatus()). + append("\n"); + if (this.getTrackingUrl() != null) { + diagnosticsBuilder.append("For more detailed output,").append( + " check application tracking page:").append( + this.getTrackingUrl()).append( + "Then, click on links to logs of each attempt.\n"); + } + diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics()) + .append("Failing this attempt"); + return diagnosticsBuilder.toString(); } private static class FinalTransition extends BaseFinalTransition { @@ -1395,27 +1465,26 @@ public class RMAppAttemptImpl implements finalStatus = unregisterEvent.getFinalApplicationStatus(); } - private static final class ContainerAcquiredTransition extends - BaseTransition { - @Override - public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - RMAppAttemptContainerAcquiredEvent acquiredEvent - = (RMAppAttemptContainerAcquiredEvent) event; - appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId()); - } - } - private static final class ContainerFinishedTransition implements MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { + // The transition To Do after attempt final state is saved. + private BaseTransition transitionToDo; + private RMAppAttemptState currentState; + + public ContainerFinishedTransition(BaseTransition transitionToDo, + RMAppAttemptState currentState) { + this.transitionToDo = transitionToDo; + this.currentState = currentState; + } + @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent containerFinishedEvent - = (RMAppAttemptContainerFinishedEvent) event; + RMAppAttemptContainerFinishedEvent containerFinishedEvent = + (RMAppAttemptContainerFinishedEvent) event; ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); @@ -1423,27 +1492,28 @@ public class RMAppAttemptImpl implements // the AMContainer, AppAttempt fails if (appAttempt.masterContainer != null && appAttempt.masterContainer.getId().equals( - containerStatus.getContainerId())) { + containerStatus.getContainerId())) { + // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, - new ContainerFinishedFinalStateSavedTransition(), - RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); + transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); return RMAppAttemptState.FINAL_SAVING; } - // Normal container.Put it in completedcontainers list + // Normal container.Put it in completed containers list appAttempt.justFinishedContainers.add(containerStatus); - return RMAppAttemptState.RUNNING; + return this.currentState; } } - private static final class ContainerFinishedAtFailedTransition + private static final class ContainerFinishedAtFinalStateTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { RMAppAttemptContainerFinishedEvent containerFinishedEvent = (RMAppAttemptContainerFinishedEvent) event; + ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); // Normal container. Add it in completed containers list @@ -1451,18 +1521,17 @@ public class RMAppAttemptImpl implements } } - private static class ContainerFinishedFinalStateSavedTransition extends + private static class AMContainerCrashedAtRunningTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent containerFinishedEvent = + RMAppAttemptContainerFinishedEvent finishEvent = (RMAppAttemptContainerFinishedEvent) event; // container associated with AM. must not be unmanaged assert appAttempt.submissionContext.getUnmanagedAM() == false; - // Setup diagnostic message - appAttempt.diagnostics - .append(getAMContainerCrashedDiagnostics(containerFinishedEvent)); + // Setup diagnostic message and exit status + appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent); new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, event); } @@ -1606,18 +1675,6 @@ public class RMAppAttemptImpl implements rmContext.getAMLivelinessMonitor().register(getAppAttemptId()); } - private void checkAttemptStoreError(RMAppAttemptEvent event) { - RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event; - if(storeEvent.getStoredException() != null) - { - // This needs to be handled for HA and give up master status if we got - // fenced - LOG.error("Failed to store attempt: " + getAppAttemptId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } - } - private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved @@ -1663,4 +1720,16 @@ public class RMAppAttemptImpl implements } return attemptReport; } + + // for testing + public boolean mayBeLastAttempt() { + return maybeLastAttempt; + } + + @Override + public RMAppAttemptMetrics getRMAppAttemptMetrics() { + // didn't use read/write lock here because RMAppAttemptMetrics has its own + // lock + return attemptMetrics; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -58,7 +61,7 @@ public interface RMContainer extends Eve Priority getAllocatedPriority(); - long getStartTime(); + long getCreationTime(); long getFinishTime(); @@ -71,5 +74,9 @@ public interface RMContainer extends Eve ContainerState getContainerState(); ContainerReport createContainerReport(); + + boolean isAMContainer(); + + List<ResourceRequest> getResourceRequests(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Tue Aug 19 23:49:39 2014 @@ -33,5 +33,7 @@ public enum RMContainerEventType { RELEASED, // Source: ContainerAllocationExpirer - EXPIRE + EXPIRE, + + RECOVER } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Tue Aug 19 23:49:39 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -27,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -34,13 +36,17 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -65,6 +71,9 @@ public class RMContainerImpl implements RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.NEW, + EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), + RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, @@ -147,27 +156,35 @@ public class RMContainerImpl implements private Resource reservedResource; private NodeId reservedNode; private Priority reservedPriority; - private long startTime; + private long creationTime; private long finishTime; private ContainerStatus finishedStatus; + private boolean isAMContainer; + private List<ResourceRequest> resourceRequests; - - + public RMContainerImpl(Container container, + ApplicationAttemptId appAttemptId, NodeId nodeId, String user, + RMContext rmContext) { + this(container, appAttemptId, nodeId, user, rmContext, System + .currentTimeMillis()); + } public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, - String user, RMContext rmContext) { + String user, RMContext rmContext, long creationTime) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; this.container = container; this.appAttemptId = appAttemptId; this.user = user; - this.startTime = System.currentTimeMillis(); + this.creationTime = creationTime; this.rmContext = rmContext; this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); - + this.isAMContainer = false; + this.resourceRequests = null; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -232,8 +249,8 @@ public class RMContainerImpl implements } @Override - public long getStartTime() { - return startTime; + public long getCreationTime() { + return creationTime; } @Override @@ -298,6 +315,25 @@ public class RMContainerImpl implements readLock.unlock(); } } + + @Override + public List<ResourceRequest> getResourceRequests() { + try { + readLock.lock(); + return resourceRequests; + } finally { + readLock.unlock(); + } + } + + public void setResourceRequests(List<ResourceRequest> requests) { + try { + writeLock.lock(); + this.resourceRequests = requests; + } finally { + writeLock.unlock(); + } + } @Override public String toString() { @@ -305,6 +341,25 @@ public class RMContainerImpl implements } @Override + public boolean isAMContainer() { + try { + readLock.lock(); + return isAMContainer; + } finally { + readLock.unlock(); + } + } + + public void setAMContainer(boolean isAMContainer) { + try { + writeLock.lock(); + this.isAMContainer = isAMContainer; + } finally { + writeLock.unlock(); + } + } + + @Override public void handle(RMContainerEvent event) { LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType()); try { @@ -341,6 +396,38 @@ public class RMContainerImpl implements } } + private static final class ContainerRecoveredTransition + implements + MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> { + @Override + public RMContainerState transition(RMContainerImpl container, + RMContainerEvent event) { + NMContainerStatus report = + ((RMContainerRecoverEvent) event).getContainerReport(); + if (report.getContainerState().equals(ContainerState.COMPLETE)) { + ContainerStatus status = + ContainerStatus.newInstance(report.getContainerId(), + report.getContainerState(), report.getDiagnostics(), + report.getContainerExitStatus()); + + new FinishedTransition().transition(container, + new RMContainerFinishedEvent(container.containerId, status, + RMContainerEventType.FINISHED)); + return RMContainerState.COMPLETED; + } else if (report.getContainerState().equals(ContainerState.RUNNING)) { + // Tell the app + container.eventHandler.handle(new RMAppRunningOnNodeEvent(container + .getApplicationAttemptId().getApplicationId(), container.nodeId)); + return RMContainerState.RUNNING; + } else { + // This can never happen. + LOG.warn("RMContainer received unexpected recover event with container" + + " state " + report.getContainerState() + " while recovering."); + return RMContainerState.RUNNING; + } + } + } + private static final class ContainerReservedTransition extends BaseTransition { @@ -368,12 +455,15 @@ public class RMContainerImpl implements @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Clear ResourceRequest stored in RMContainer + container.setResourceRequests(null); + // Register with containerAllocationExpirer. container.containerAllocationExpirer.register(container.getContainerId()); - // Tell the appAttempt - container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( - container.getApplicationAttemptId(), container.getContainer())); + // Tell the app + container.eventHandler.handle(new RMAppRunningOnNodeEvent(container + .getApplicationAttemptId().getApplicationId(), container.nodeId)); } } @@ -396,11 +486,30 @@ public class RMContainerImpl implements container.finishTime = System.currentTimeMillis(); container.finishedStatus = finishedEvent.getRemoteContainerStatus(); // Inform AppAttempt + // container.getContainer() can return null when a RMContainer is a + // reserved container + updateMetricsIfPreempted(container); + container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + container.appAttemptId, finishedEvent.getRemoteContainerStatus())); - container.rmContext.getRMApplicationHistoryWriter() - .containerFinished(container); + container.rmContext.getRMApplicationHistoryWriter().containerFinished( + container); + } + + private static void updateMetricsIfPreempted(RMContainerImpl container) { + // If this is a preempted container, update preemption metrics + if (ContainerExitStatus.PREEMPTED == container.finishedStatus + .getExitStatus()) { + + Resource resource = container.getContainer().getResource(); + RMAppAttempt rmAttempt = + container.rmContext.getRMApps() + .get(container.getApplicationAttemptId().getApplicationId()) + .getCurrentAppAttempt(); + rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, + container); + } } } @@ -442,7 +551,7 @@ public class RMContainerImpl implements try { containerReport = ContainerReport.newInstance(this.getContainerId(), this.getAllocatedResource(), this.getAllocatedNode(), - this.getAllocatedPriority(), this.getStartTime(), + this.getAllocatedPriority(), this.getCreationTime(), this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(), this.getContainerExitStatus(), this.getContainerState()); } finally { @@ -450,5 +559,4 @@ public class RMContainerImpl implements } return containerReport; } - } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 19 23:49:39 2014 @@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.re import java.util.ArrayList; import java.util.EnumSet; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -48,12 +47,15 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -91,9 +93,9 @@ public class RMNodeImpl implements RMNod private final RMContext context; private final String hostName; private final int commandPort; - private final int httpPort; + private int httpPort; private final String nodeAddress; // The containerManager address - private final String httpAddress; + private String httpAddress; private volatile ResourceOption resourceOption; private final Node node; @@ -102,8 +104,8 @@ public class RMNodeImpl implements RMNod private String nodeManagerVersion; /* set of containers that have just launched */ - private final Map<ContainerId, ContainerStatus> justLaunchedContainers = - new HashMap<ContainerId, ContainerStatus>(); + private final Set<ContainerId> launchedContainers = + new HashSet<ContainerId>(); /* set of containers that need to be cleaned */ private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( @@ -454,19 +456,33 @@ public class RMNodeImpl implements RMNod } } + private static void handleRunningAppOnNode(RMNodeImpl rmNode, + RMContext context, ApplicationId appId, NodeId nodeId) { + RMApp app = context.getRMApps().get(appId); + + // if we failed getting app by appId, maybe something wrong happened, just + // add the app to the finishedApplications list so that the app can be + // cleaned up on the NM + if (null == app) { + LOG.warn("Cannot get RMApp by appId=" + appId + + ", just added it to finishedApplications list for cleanup"); + rmNode.finishedApplications.add(appId); + return; + } + + context.getDispatcher().getEventHandler() + .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); + } + public static class AddNodeTransition implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; + List<NMContainerStatus> containers = null; - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -476,44 +492,51 @@ public class RMNodeImpl implements RMNod } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); + containers = startEvent.getNMContainerStatuses(); + if (containers != null && !containers.isEmpty()) { + for (NMContainerStatus container : containers) { + if (container.getContainerState() == ContainerState.RUNNING) { + rmNode.launchedContainers.add(container.getContainerId()); + } + } + } } + + if (null != startEvent.getRunningApplications()) { + for (ApplicationId appId : startEvent.getRunningApplications()) { + handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); + } + } + + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } - + public static class ReconnectNodeTransition implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Kill containers since node is rejoining. - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - - RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode(); - if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) - && rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); + RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; + RMNode newNode = reconnectEvent.getReconnectedNode(); + rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.resourceOption = newNode.getResourceOption(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + + if (null != reconnectEvent.getRunningApplications()) { + for (ApplicationId appId : reconnectEvent.getRunningApplications()) { + handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); } + rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); @@ -633,14 +656,14 @@ public class RMNodeImpl implements RMNod // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!rmNode.justLaunchedContainers.containsKey(containerId)) { + if (!rmNode.launchedContainers.contains(containerId)) { // Just launched container. RM knows about it the first time. - rmNode.justLaunchedContainers.put(containerId, remoteContainer); + rmNode.launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); } } else { // A finished container - rmNode.justLaunchedContainers.remove(containerId); + rmNode.launchedContainers.remove(containerId); completedContainers.add(remoteContainer); } } @@ -717,4 +740,10 @@ public class RMNodeImpl implements RMNod public int getQueueSize() { return nodeUpdateQueue.size(); } + + // For test only. + @VisibleForTesting + public Set<ContainerId> getLaunchedContainers() { + return this.launchedContainers; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Tue Aug 19 23:49:39 2014 @@ -18,17 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; public class RMNodeReconnectEvent extends RMNodeEvent { private RMNode reconnectedNode; + private List<ApplicationId> runningApplications; - public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) { + public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode, + List<ApplicationId> runningApps) { super(nodeId, RMNodeEventType.RECONNECTED); reconnectedNode = newNode; + runningApplications = runningApps; } public RMNode getReconnectedNode() { return reconnectedNode; } + + public List<ApplicationId> getRunningApplications() { + return runningApplications; + } }
