Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Thu Oct 31 18:49:54 2013 @@ -18,7 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -27,6 +34,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -37,9 +46,6 @@ import org.apache.hadoop.yarn.security.c import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.ZKUtil; - import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -51,13 +57,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import com.google.common.annotations.VisibleForTesting; @Private @Unstable @@ -224,8 +224,11 @@ public class ZKRMStateStore extends RMSt ApplicationStateDataProto.parseFrom(childData)); ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); + appStateData.getStartTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser(), + appStateData.getState(), + appStateData.getDiagnostics(), appStateData.getFinishTime()); if (!appId.equals(appState.context.getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); @@ -249,7 +252,12 @@ public class ZKRMStateStore extends RMSt } ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), + attemptStateData.getState(), + attemptStateData.getFinalTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus()); if (!attemptId.equals(attemptState.getAttemptId())) { throw new YarnRuntimeException("The child node name is different " + "from the application attempt id"); @@ -280,21 +288,34 @@ public class ZKRMStateStore extends RMSt } @Override - public synchronized void storeApplicationState( - String appId, ApplicationStateDataPBImpl appStateDataPB) throws - Exception { + public synchronized void storeApplicationStateInternal(String appId, + ApplicationStateDataPBImpl appStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, appId); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - createWithRetries( - nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + createWithRetries(nodeCreatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); + + } + + @Override + public synchronized void updateApplicationStateInternal(String appId, + ApplicationStateDataPBImpl appStateDataPB) throws Exception { + String nodeCreatePath = getNodePath(rmAppRoot, appId); + + if (LOG.isDebugEnabled()) { + LOG.debug("Storing final state info for app: " + appId + " at: " + + nodeCreatePath); + } + byte[] appStateData = appStateDataPB.getProto().toByteArray(); + setDataWithRetries(nodeCreatePath, appStateData, 0); } @Override - public synchronized void storeApplicationAttemptState( + public synchronized void storeApplicationAttemptStateInternal( String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, attemptId); @@ -304,7 +325,20 @@ public class ZKRMStateStore extends RMSt } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); createWithRetries(nodeCreatePath, attemptStateData, zkAcl, - CreateMode.PERSISTENT); + CreateMode.PERSISTENT); + } + + @Override + public synchronized void updateApplicationAttemptStateInternal( + String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + throws Exception { + String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing final state info for attempt: " + attemptId + " at: " + + nodeCreatePath); + } + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); + setDataWithRetries(nodeCreatePath, attemptStateData, 0); } @Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Thu Oct 31 18:49:54 2013 @@ -24,6 +24,8 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; /* * Contains the state data that needs to be persisted for an ApplicationAttempt @@ -61,4 +63,50 @@ public interface ApplicationAttemptState public ByteBuffer getAppAttemptTokens(); public void setAppAttemptTokens(ByteBuffer attemptTokens); + + /** + * Get the final state of the application attempt. + * @return the final state of the application attempt. + */ + public RMAppAttemptState getState(); + + public void setState(RMAppAttemptState state); + + /** + * Get the original not-proxied <em>final tracking url</em> for the + * application. This is intended to only be used by the proxy itself. + * + * @return the original not-proxied <em>final tracking url</em> for the + * application + */ + public String getFinalTrackingUrl(); + + /** + * Set the final tracking Url of the AM. + * @param url + */ + public void setFinalTrackingUrl(String url); + /** + * Get the <em>diagnositic information</em> of the attempt + * @return <em>diagnositic information</em> of the attempt + */ + public String getDiagnostics(); + + public void setDiagnostics(String diagnostics); + + /** + * Get the <em>start time</em> of the application. + * @return <em>start time</em> of the application + */ + public long getStartTime(); + + public void setStartTime(long startTime); + + /** + * Get the <em>final finish status</em> of the application. + * @return <em>final finish status</em> of the application + */ + public FinalApplicationStatus getFinalApplicationStatus(); + + public void setFinalApplicationStatus(FinalApplicationStatus finishState); } Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Thu Oct 31 18:49:54 2013 @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; /** * Contains all the state data that needs to be stored persistently @@ -42,7 +45,19 @@ public interface ApplicationStateData { @Public @Unstable public void setSubmitTime(long submitTime); - + + /** + * Get the <em>start time</em> of the application. + * @return <em>start time</em> of the application + */ + @Public + @Stable + public abstract long getStartTime(); + + @Private + @Unstable + public abstract void setStartTime(long startTime); + /** * The application submitter */ @@ -66,6 +81,29 @@ public interface ApplicationStateData { @Public @Unstable public void setApplicationSubmissionContext( - ApplicationSubmissionContext context); + ApplicationSubmissionContext context); + + /** + * Get the final state of the application. + * @return the final state of the application. + */ + public RMAppState getState(); + + public void setState(RMAppState state); + + /** + * Get the diagnostics information for the application master. + * @return the diagnostics information for the application master. + */ + public String getDiagnostics(); + + public void setDiagnostics(String diagnostics); + + /** + * The finish time of the application. + * @return the finish time of the application., + */ + public long getFinishTime(); + public void setFinishTime(long finishTime); } Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Thu Oct 31 18:49:54 2013 @@ -22,14 +22,19 @@ import java.nio.ByteBuffer; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; public class ApplicationAttemptStateDataPBImpl extends ProtoBase<ApplicationAttemptStateDataProto> @@ -156,14 +161,125 @@ implements ApplicationAttemptStateData { this.appAttemptTokens = attemptTokens; } + @Override + public RMAppAttemptState getState() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppAttemptState()) { + return null; + } + return convertFromProtoFormat(p.getAppAttemptState()); + } + + @Override + public void setState(RMAppAttemptState state) { + maybeInitBuilder(); + if (state == null) { + builder.clearAppAttemptState(); + return; + } + builder.setAppAttemptState(convertToProtoFormat(state)); + } + + @Override + public String getFinalTrackingUrl() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFinalTrackingUrl()) { + return null; + } + return p.getFinalTrackingUrl(); + } + + @Override + public void setFinalTrackingUrl(String url) { + maybeInitBuilder(); + if (url == null) { + builder.clearFinalTrackingUrl(); + return; + } + builder.setFinalTrackingUrl(url); + } + + @Override + public String getDiagnostics() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnostics); + } + + @Override + public long getStartTime() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getStartTime(); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override + public FinalApplicationStatus getFinalApplicationStatus() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFinalApplicationStatus()) { + return null; + } + return convertFromProtoFormat(p.getFinalApplicationStatus()); + } + + @Override + public void setFinalApplicationStatus(FinalApplicationStatus finishState) { + maybeInitBuilder(); + if (finishState == null) { + builder.clearFinalApplicationStatus(); + return; + } + builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); + } + public static ApplicationAttemptStateData newApplicationAttemptStateData( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens) { + ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus) { ApplicationAttemptStateData attemptStateData = recordFactory.newRecordInstance(ApplicationAttemptStateData.class); attemptStateData.setAttemptId(attemptId); attemptStateData.setMasterContainer(container); attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setState(finalState); + attemptStateData.setFinalTrackingUrl(finalTrackingUrl); + attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setStartTime(startTime); + attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); return attemptStateData; } + + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; + public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { + return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); + } + public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) { + return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, "")); + } + + private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus s) { + return ProtoUtils.convertToProtoFormat(s); + } + private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { + return ProtoUtils.convertFromProtoFormat(s); + } + } Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Thu Oct 31 18:49:54 2013 @@ -21,14 +21,20 @@ package org.apache.hadoop.yarn.server.re import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; public class ApplicationStateDataPBImpl extends ProtoBase<ApplicationStateDataProto> implements ApplicationStateData { - + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + ApplicationStateDataProto proto = ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.Builder builder = null; @@ -92,6 +98,18 @@ implements ApplicationStateData { } @Override + public long getStartTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getStartTime(); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override public String getUser() { ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasUser()) { @@ -132,4 +150,78 @@ implements ApplicationStateData { this.applicationSubmissionContext = context; } + @Override + public RMAppState getState() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationState()) { + return null; + } + return convertFromProtoFormat(p.getApplicationState()); + } + + @Override + public void setState(RMAppState finalState) { + maybeInitBuilder(); + if (finalState == null) { + builder.clearApplicationState(); + return; + } + builder.setApplicationState(convertToProtoFormat(finalState)); + } + + @Override + public String getDiagnostics() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnostics); + } + + @Override + public long getFinishTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getFinishTime(); + } + + @Override + public void setFinishTime(long finishTime) { + maybeInitBuilder(); + builder.setFinishTime(finishTime); + } + + public static ApplicationStateData newApplicationStateData(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, RMAppState state, + String diagnostics, long finishTime) { + + ApplicationStateData appState = + recordFactory.newRecordInstance(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + return appState; + } + + private static String RM_APP_PREFIX = "RMAPP_"; + public static RMAppStateProto convertToProtoFormat(RMAppState e) { + return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name()); + } + public static RMAppState convertFromProtoFormat(RMAppStateProto e) { + return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); + } } Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Thu Oct 31 18:49:54 2013 @@ -35,6 +35,7 @@ public enum RMAppEventType { NODE_UPDATE, // Source: RMStateStore - APP_SAVED, + APP_NEW_SAVED, + APP_UPDATE_SAVED, APP_REMOVED } Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Oct 31 18:49:54 2013 @@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -103,7 +101,8 @@ public class RMAppImpl implements RMApp, // Mutable fields private long startTime; - private long finishTime; + private long finishTime = 0; + private long storedFinishTime = 0; private RMAppAttempt currentAttempt; private String queue; @SuppressWarnings("rawtypes") @@ -111,8 +110,11 @@ public class RMAppImpl implements RMApp, private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); - private boolean isAppRemovalRequestSent = false; - private RMAppState previousStateAtRemoving; + private RMAppState stateBeforeFinalSaving; + private RMAppEvent eventCausingFinalSaving; + private RMAppState targetedFinalState; + private RMAppState recoveredFinalState; + Object transitionTodo; private static final StateMachineFactory<RMAppImpl, RMAppState, @@ -129,32 +131,45 @@ public class RMAppImpl implements RMApp, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppSavingTransition()) - .addTransition(RMAppState.NEW, RMAppState.SUBMITTED, - RMAppEventType.RECOVER, new StartAppAttemptTransition()) - .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, - new AppKilledTransition()) - .addTransition(RMAppState.NEW, RMAppState.FAILED, - RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, + RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, + RMAppState.FINAL_SAVING), + RMAppEventType.RECOVER, new RMAppRecoveredTransition()) + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL, + new FinalSavingTransition( + new AppKilledTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition( + new AppRejectedTransition(), RMAppState.FAILED)) // Transitions from NEW_SAVING state .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, - RMAppEventType.APP_SAVED, new StartAppAttemptTransition()) - .addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED, - RMAppEventType.KILL, new AppKilledTransition()) - .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED, - RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.KILL, + new FinalSavingTransition( + new AppKilledTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED, - RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition( + new AppRejectedTransition(), RMAppState.FAILED)) .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED) - .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, + RMAppEventType.KILL, + new FinalSavingTransition( + new KillAppAndAttemptTransition(), RMAppState.KILLED)) // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, @@ -162,37 +177,45 @@ public class RMAppImpl implements RMApp, .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED) .addTransition(RMAppState.ACCEPTED, - EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED), + EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) - .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.KILL, + new FinalSavingTransition( + new KillAppAndAttemptTransition(), RMAppState.KILLED)) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.REMOVING, - RMAppEventType.ATTEMPT_UNREGISTERED, - new RMAppRemovingTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_UNREGISTERED, + new FinalSavingTransition( + new AttemptUnregisteredTransition(), + RMAppState.FINISHING, RMAppState.FINISHED)) .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, + // UnManagedAM directly jumps to finished RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) .addTransition(RMAppState.RUNNING, - EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED), + EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.SUBMITTED)) - .addTransition(RMAppState.RUNNING, RMAppState.KILLED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) - - // Transitions from REMOVING state - .addTransition(RMAppState.REMOVING, RMAppState.FINISHING, - RMAppEventType.APP_REMOVED, new RMAppFinishingTransition()) - .addTransition(RMAppState.REMOVING, RMAppState.FINISHED, - RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) - .addTransition(RMAppState.REMOVING, RMAppState.KILLED, - RMAppEventType.KILL, new KillAppAndAttemptTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, + RMAppEventType.KILL, + new FinalSavingTransition( + new KillAppAndAttemptTransition(), RMAppState.KILLED)) + + // Transitions from FINAL_SAVING state + .addTransition(RMAppState.FINAL_SAVING, + EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED, + new FinalStateSavedTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_FINISHED, + new AttemptFinishedAtFinalSavingTransition()) // ignorable transitions - .addTransition(RMAppState.REMOVING, RMAppState.REMOVING, - RMAppEventType.NODE_UPDATE) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, @@ -201,7 +224,7 @@ public class RMAppImpl implements RMApp, RMAppEventType.KILL, new KillAppAndAttemptTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED)) + EnumSet.of(RMAppEventType.NODE_UPDATE)) // Transitions from FINISHED state // ignorable transitions @@ -210,14 +233,12 @@ public class RMAppImpl implements RMApp, RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL, - RMAppEventType.APP_REMOVED)) + RMAppEventType.KILL)) // Transitions from FAILED state // ignorable transitions .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, - RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions @@ -227,8 +248,7 @@ public class RMAppImpl implements RMApp, EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE, - RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED)) + RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE)) .installTopology(); @@ -316,9 +336,8 @@ public class RMAppImpl implements RMApp, @Override public RMAppState getState() { this.readLock.lock(); - try { - return this.stateMachine.getCurrentState(); + return this.stateMachine.getCurrentState(); } finally { this.readLock.unlock(); } @@ -398,7 +417,7 @@ public class RMAppImpl implements RMApp, case SUBMITTED: case ACCEPTED: case RUNNING: - case REMOVING: + case FINAL_SAVING: return FinalApplicationStatus.UNDEFINED; // finished without a proper final state is the same as failed case FINISHING: @@ -586,8 +605,12 @@ public class RMAppImpl implements RMApp, @Override public void recover(RMState state) throws Exception{ ApplicationState appState = state.getApplicationState().get(getApplicationId()); + this.recoveredFinalState = appState.getState(); LOG.info("Recovering app: " + getApplicationId() + " with " + - + appState.getAttemptCount() + " attempts"); + + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState ); + this.diagnostics.append(appState.getDiagnostics()); + this.storedFinishTime = appState.getFinishTime(); + this.startTime = appState.getStartTime(); for(int i=0; i<appState.getAttemptCount(); ++i) { // create attempt createNewAttempt(false); @@ -632,60 +655,195 @@ public class RMAppImpl implements RMApp, nodeUpdateEvent.getNode()); }; } - + + private static final class RMAppRecoveredTransition implements + MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { + @Override + public RMAppState transition(RMAppImpl app, RMAppEvent event) { + + if (app.recoveredFinalState != null) { + FINAL_TRANSITION.transition(app, event); + return app.recoveredFinalState; + } + // Directly call AttemptFailedTransition, since now we deem that an + // application fails because of RM restart as a normal AM failure. + + // Do not recover unmanaged applications since current recovery + // mechanism of restarting attempts does not work for them. + // This will need to be changed in work preserving recovery in which + // RM will re-connect with the running AM's instead of restarting them + + // In work-preserve restart, if attemptCount == maxAttempts, the job still + // needs to be recovered because the last attempt may still be running. + + // As part of YARN-1210, we may return ACCECPTED state waiting for AM to + // reregister or fail and remove the following code. + return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app, + event); + } + } + private static final class StartAppAttemptTransition extends RMAppTransition { + @Override public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_SAVED)) { - assert app.getState().equals(RMAppState.NEW_SAVING); - RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; - if(storeEvent.getStoredException() != null) { - // For HA this exception needs to be handled by giving up - // master status if we got fenced - LOG.error("Failed to store application: " - + storeEvent.getApplicationId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } + RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event; + if (storeEvent.getStoredException() != null) { + // For HA this exception needs to be handled by giving up + // master status if we got fenced + LOG.error( + "Failed to store application: " + storeEvent.getApplicationId(), + storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); } - app.createNewAttempt(true); }; } - private static final class RMAppFinishingTransition extends RMAppTransition { + private static final class FinalStateSavedTransition implements + MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public RMAppState transition(RMAppImpl app, RMAppEvent event) { + RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event; + if (storeEvent.getUpdatedException() != null) { + LOG.error("Failed to update the final state of application" + + storeEvent.getApplicationId(), storeEvent.getUpdatedException()); + ExitUtil.terminate(1, storeEvent.getUpdatedException()); + } + + if (app.transitionTodo instanceof SingleArcTransition) { + ((SingleArcTransition) app.transitionTodo).transition(app, + app.eventCausingFinalSaving); + } else if (app.transitionTodo instanceof MultipleArcTransition) { + ((MultipleArcTransition) app.transitionTodo).transition(app, + app.eventCausingFinalSaving); + } + return app.targetedFinalState; + + } + } + + private static class AttemptFailedFinalStateSavedTransition extends + RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - if (event.getType().equals(RMAppEventType.APP_REMOVED)) { - RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event; - if (removeEvent.getRemovedException() != null) { - LOG.error( - "Failed to remove application: " + removeEvent.getApplicationId(), - removeEvent.getRemovedException()); - ExitUtil.terminate(1, removeEvent.getRemovedException()); - } + String msg = null; + if (event instanceof RMAppFailedAttemptEvent) { + msg = app.getAppAttemptFailedDiagnostics(event); } - app.finishTime = System.currentTimeMillis(); + LOG.info(msg); + app.diagnostics.append(msg); + // Inform the node for app-finish + FINAL_TRANSITION.transition(app, event); + } + } + + private String getAppAttemptFailedDiagnostics(RMAppEvent event) { + String msg = null; + RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; + if (this.submissionContext.getUnmanagedAM()) { + // RM does not manage the AM. Do not retry + msg = "Unmanaged application " + this.getApplicationId() + + " failed due to " + failedEvent.getDiagnostics() + + ". Failing the application."; + } else if (this.attempts.size() >= this.maxAppAttempts) { + msg = "Application " + this.getApplicationId() + " failed " + + this.maxAppAttempts + " times due to " + + failedEvent.getDiagnostics() + ". Failing the application."; } + return msg; } private static final class RMAppSavingTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { + // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client // communication LOG.info("Storing application with id " + app.applicationId); - app.rmContext.getStateStore().storeApplication(app); + app.rmContext.getStateStore().storeNewApplication(app); } } - private static final class RMAppRemovingTransition extends RMAppTransition { + private void rememberTargetTransitions(RMAppEvent event, + Object transitionToDo, RMAppState targetFinalState) { + transitionTodo = transitionToDo; + targetedFinalState = targetFinalState; + eventCausingFinalSaving = event; + } + + private void rememberTargetTransitionsAndStoreState(RMAppEvent event, + Object transitionToDo, RMAppState targetFinalState, + RMAppState stateToBeStored) { + rememberTargetTransitions(event, transitionToDo, targetFinalState); + this.stateBeforeFinalSaving = getState(); + this.storedFinishTime = System.currentTimeMillis(); + + LOG.info("Updating application " + this.applicationId + + " with final state: " + this.targetedFinalState); + // we lost attempt_finished diagnostics in app, because attempt_finished + // diagnostics is sent after app final state is saved. Later on, we will + // create GetApplicationAttemptReport specifically for getting per attempt + // info. + String diags = null; + switch (event.getType()) { + case APP_REJECTED: + RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event; + diags = rejectedEvent.getMessage(); + break; + case ATTEMPT_FINISHED: + RMAppFinishedAttemptEvent finishedEvent = + (RMAppFinishedAttemptEvent) event; + diags = finishedEvent.getDiagnostics(); + break; + case ATTEMPT_FAILED: + RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; + diags = getAppAttemptFailedDiagnostics(failedEvent); + break; + case KILL: + diags = getAppKilledDiagnostics(); + break; + default: + break; + } + ApplicationState appState = + new ApplicationState(this.submitTime, this.startTime, + this.submissionContext, this.user, stateToBeStored, diags, + this.storedFinishTime); + this.rmContext.getStateStore().updateApplicationState(appState); + } + + private static final class FinalSavingTransition extends RMAppTransition { + Object transitionToDo; + RMAppState targetedFinalState; + RMAppState stateToBeStored; + + public FinalSavingTransition(Object transitionToDo, + RMAppState targetedFinalState) { + this(transitionToDo, targetedFinalState, targetedFinalState); + } + + public FinalSavingTransition(Object transitionToDo, + RMAppState targetedFinalState, RMAppState stateToBeStored) { + this.transitionToDo = transitionToDo; + this.targetedFinalState = targetedFinalState; + this.stateToBeStored = stateToBeStored; + } + @Override public void transition(RMAppImpl app, RMAppEvent event) { - LOG.info("Removing application with id " + app.applicationId); - app.removeApplicationState(); - app.previousStateAtRemoving = app.getState(); + app.rememberTargetTransitionsAndStoreState(event, transitionToDo, + targetedFinalState, stateToBeStored); + } + } + + private static class AttemptUnregisteredTransition extends RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + app.finishTime = app.storedFinishTime; } } @@ -698,6 +856,40 @@ public class RMAppImpl implements RMApp, }; } + private static class AttemptFinishedAtFinalSavingTransition extends + RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + if (app.targetedFinalState.equals(RMAppState.FAILED) + || app.targetedFinalState.equals(RMAppState.KILLED)) { + // Ignore Attempt_Finished event if we were supposed to reach FAILED + // FINISHED state + return; + } + + // pass in the earlier attempt_unregistered event, as it is needed in + // AppFinishedFinalStateSavedTransition later on + app.rememberTargetTransitions(event, + new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving), + RMAppState.FINISHED); + }; + } + + private static class AppFinishedFinalStateSavedTransition extends + RMAppTransition { + RMAppEvent attemptUnregistered; + + public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) { + this.attemptUnregistered = attemptUnregistered; + } + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + new AttemptUnregisteredTransition().transition(app, attemptUnregistered); + FINISHED_TRANSITION.transition(app, event); + }; + } + + private static class AppKilledTransition extends FinalTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { @@ -706,6 +898,10 @@ public class RMAppImpl implements RMApp, }; } + private static String getAppKilledDiagnostics() { + return "Application killed by user."; + } + private static class KillAppAndAttemptTransition extends AppKilledTransition { @SuppressWarnings("unchecked") @Override @@ -741,12 +937,10 @@ public class RMAppImpl implements RMApp, app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); } - if (app.getState() != RMAppState.FINISHING) { + app.finishTime = app.storedFinishTime; + if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - // application completely done and remove from state store. - app.removeApplicationState(); - app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); @@ -764,32 +958,15 @@ public class RMAppImpl implements RMApp, @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - - RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event); - boolean retryApp = true; - String msg = null; - if (app.submissionContext.getUnmanagedAM()) { - // RM does not manage the AM. Do not retry - retryApp = false; - msg = "Unmanaged application " + app.getApplicationId() - + " failed due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } else if (app.attempts.size() >= app.maxAppAttempts) { - retryApp = false; - msg = "Application " + app.getApplicationId() + " failed " - + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics() - + ". Failing the application."; - } - - if (retryApp) { + if (!app.submissionContext.getUnmanagedAM() + && app.attempts.size() < app.maxAppAttempts) { app.createNewAttempt(true); return initialState; } else { - LOG.info(msg); - app.diagnostics.append(msg); - // Inform the node for app-finish - FINAL_TRANSITION.transition(app, event); - return RMAppState.FAILED; + app.rememberTargetTransitionsAndStoreState(event, + new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, + RMAppState.FAILED); + return RMAppState.FINAL_SAVING; } } @@ -814,9 +991,9 @@ public class RMAppImpl implements RMApp, @Override public YarnApplicationState createApplicationState() { RMAppState rmAppState = getState(); - // If App is in REMOVING state, return its previous state. - if (rmAppState.equals(RMAppState.REMOVING)) { - rmAppState = previousStateAtRemoving; + // If App is in FINAL_SAVING state, return its previous state. + if (rmAppState.equals(RMAppState.FINAL_SAVING)) { + rmAppState = stateBeforeFinalSaving; } switch (rmAppState) { case NEW: @@ -840,11 +1017,4 @@ public class RMAppImpl implements RMApp, throw new YarnRuntimeException("Unknown state passed!"); } } - - private void removeApplicationState(){ - if (!isAppRemovalRequestSent) { - rmContext.getStateStore().removeApplication(this); - isAppRemovalRequestSent = true; - } - } } Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Thu Oct 31 18:49:54 2013 @@ -24,7 +24,7 @@ public enum RMAppState { SUBMITTED, ACCEPTED, RUNNING, - REMOVING, + FINAL_SAVING, FINISHING, FINISHED, FAILED, Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1537584&r1=1537583&r2=1537584&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Thu Oct 31 18:49:54 2013 @@ -41,7 +41,8 @@ public enum RMAppAttemptEventType { CONTAINER_FINISHED, // Source: RMStateStore - ATTEMPT_SAVED, + ATTEMPT_NEW_SAVED, + ATTEMPT_UPDATE_SAVED, // Source: Scheduler APP_REJECTED,
