Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Aug 20 01:34:29 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no import static org.apache.hadoop.service.Service.STATE.STARTED; +import java.io.DataInputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -42,6 +43,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; @@ -119,11 +126,15 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; public class ContainerManagerImpl extends CompositeService implements ServiceStateChangeListener, ContainerManagementProtocol, @@ -224,11 +235,104 @@ public class ContainerManagerImpl extend recover(); } + @SuppressWarnings("unchecked") private void recover() throws IOException, URISyntaxException { NMStateStoreService stateStore = context.getNMStateStore(); if (stateStore.canRecover()) { rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); + + RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); + for (ContainerManagerApplicationProto proto : + appsState.getApplications()) { + recoverApplication(proto); + } + + for (RecoveredContainerState rcs : stateStore.loadContainersState()) { + recoverContainer(rcs); + } + + String diagnostic = "Application marked finished during recovery"; + for (ApplicationId appId : appsState.getFinishedApplications()) { + dispatcher.getEventHandler().handle( + new ApplicationFinishEvent(appId, diagnostic)); + } + } + } + + private void recoverApplication(ContainerManagerApplicationProto p) + throws IOException { + ApplicationId appId = new ApplicationIdPBImpl(p.getId()); + Credentials creds = new Credentials(); + creds.readTokenStorageStream( + new DataInputStream(p.getCredentials().newInput())); + + List<ApplicationACLMapProto> aclProtoList = p.getAclsList(); + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(aclProtoList.size()); + for (ApplicationACLMapProto aclProto : aclProtoList) { + acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()), + aclProto.getAcl()); + } + + LOG.info("Recovering application " + appId); + ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, + creds, context); + context.getApplications().put(appId, app); + app.handle(new ApplicationInitEvent(appId, acls)); + } + + @SuppressWarnings("unchecked") + private void recoverContainer(RecoveredContainerState rcs) + throws IOException { + StartContainerRequest req = rcs.getStartRequest(); + ContainerLaunchContext launchContext = req.getContainerLaunchContext(); + ContainerTokenIdentifier token = + BuilderUtils.newContainerTokenIdentifier(req.getContainerToken()); + ContainerId containerId = token.getContainerID(); + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Recovering " + containerId + " in state " + rcs.getStatus() + + " with exit code " + rcs.getExitCode()); + + if (context.getApplications().containsKey(appId)) { + Credentials credentials = parseCredentials(launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + context.getNMStateStore(), req.getContainerLaunchContext(), + credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), + rcs.getDiagnostics(), rcs.getKilled()); + context.getContainers().put(containerId, container); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container)); + } else { + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { + LOG.warn(containerId + " has no corresponding application!"); + } + LOG.info("Adding " + containerId + " to recently stopped containers"); + nodeStatusUpdater.addCompletedContainer(containerId); + } + } + + private void waitForRecoveredContainers() throws InterruptedException { + final int sleepMsec = 100; + int waitIterations = 100; + List<ContainerId> newContainers = new ArrayList<ContainerId>(); + while (--waitIterations >= 0) { + newContainers.clear(); + for (Container container : context.getContainers().values()) { + if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) { + newContainers.add(container.getContainerId()); + } + } + if (newContainers.isEmpty()) { + break; + } + LOG.info("Waiting for containers: " + newContainers); + Thread.sleep(sleepMsec); + } + if (waitIterations < 0) { + LOG.warn("Timeout waiting for recovered containers"); } } @@ -265,6 +369,23 @@ public class ContainerManagerImpl extend // Enqueue user dirs in deletion context Configuration conf = getConfig(); + final InetSocketAddress initialAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_ADDRESS, + YarnConfiguration.DEFAULT_NM_PORT); + boolean usingEphemeralPort = (initialAddress.getPort() == 0); + if (context.getNMStateStore().canRecover() && usingEphemeralPort) { + throw new IllegalArgumentException("Cannot support recovery with an " + + "ephemeral server port. Check the setting of " + + YarnConfiguration.NM_ADDRESS); + } + // If recovering then delay opening the RPC service until the recovery + // of resources and containers have completed, otherwise requests from + // clients during recovery can interfere with the recovery process. + final boolean delayedRpcServerStart = + context.getNMStateStore().canRecover(); + Configuration serverConf = new Configuration(conf); // always enforce it to be token-based. @@ -274,11 +395,6 @@ public class ContainerManagerImpl extend YarnRPC rpc = YarnRPC.create(conf); - InetSocketAddress initialAddress = conf.getSocketAddr( - YarnConfiguration.NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_PORT); - server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(), @@ -295,16 +411,61 @@ public class ContainerManagerImpl extend LOG.info("Blocking new container-requests as container manager rpc" + " server is still starting."); this.setBlockNewContainerRequests(true); - server.start(); - InetSocketAddress connectAddress = NetUtils.getConnectAddress(server); - NodeId nodeId = NodeId.newInstance( - connectAddress.getAddress().getCanonicalHostName(), - connectAddress.getPort()); + + String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST); + String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS); + String hostOverride = null; + if (bindHost != null && !bindHost.isEmpty() + && nmAddress != null && !nmAddress.isEmpty()) { + //a bind-host case with an address, to support overriding the first + //hostname found when querying for our hostname with the specified + //address, combine the specified address with the actual port listened + //on by the server + hostOverride = nmAddress.split(":")[0]; + } + + // setup node ID + InetSocketAddress connectAddress; + if (delayedRpcServerStart) { + connectAddress = NetUtils.getConnectAddress(initialAddress); + } else { + server.start(); + connectAddress = NetUtils.getConnectAddress(server); + } + NodeId nodeId = buildNodeId(connectAddress, hostOverride); ((NodeManager.NMContext)context).setNodeId(nodeId); this.context.getNMTokenSecretManager().setNodeId(nodeId); this.context.getContainerTokenSecretManager().setNodeId(nodeId); - LOG.info("ContainerManager started at " + connectAddress); + + // start remaining services super.serviceStart(); + + if (delayedRpcServerStart) { + waitForRecoveredContainers(); + server.start(); + + // check that the node ID is as previously advertised + connectAddress = NetUtils.getConnectAddress(server); + NodeId serverNode = buildNodeId(connectAddress, hostOverride); + if (!serverNode.equals(nodeId)) { + throw new IOException("Node mismatch after server started, expected '" + + nodeId + "' but found '" + serverNode + "'"); + } + } + + LOG.info("ContainerManager started at " + connectAddress); + LOG.info("ContainerManager bound to " + initialAddress); + } + + private NodeId buildNodeId(InetSocketAddress connectAddress, + String hostOverride) { + if (hostOverride != null) { + connectAddress = NetUtils.getConnectAddress( + new InetSocketAddress(hostOverride, connectAddress.getPort())); + } + return NodeId.newInstance( + connectAddress.getAddress().getCanonicalHostName(), + connectAddress.getPort()); } void refreshServiceAcls(Configuration configuration, @@ -341,6 +502,12 @@ public class ContainerManagerImpl extend } LOG.info("Applications still running : " + applications.keySet()); + if (this.context.getNMStateStore().canRecover() + && !this.context.getDecommissioned()) { + // do not cleanup apps as they can be recovered on restart + return; + } + List<ApplicationId> appIds = new ArrayList<ApplicationId>(applications.keySet()); this.handle( @@ -497,6 +664,8 @@ public class ContainerManagerImpl extend messageBuilder.append("\nThis token is expired. current time is ") .append(System.currentTimeMillis()).append(" found ") .append(containerTokenIdentifier.getExpiryTimeStamp()); + messageBuilder.append("\nNote: System times on machines may be out of sync.") + .append(" Check system time and time zones."); } if (unauthorized) { String msg = messageBuilder.toString(); @@ -548,6 +717,41 @@ public class ContainerManagerImpl extend succeededContainers, failedContainers); } + private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, + String user, Credentials credentials, + Map<ApplicationAccessType, String> appAcls) { + + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId).getProto()); + builder.setUser(user); + + builder.clearCredentials(); + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + builder.setCredentials(ByteString.copyFrom(dob.getData())); + } catch (IOException e) { + // should not occur + LOG.error("Cannot serialize credentials", e); + } + } + + builder.clearAcls(); + if (appAcls != null) { + for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) { + ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder() + .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) + .setAcl(acl.getValue()) + .build(); + builder.addAcls(p); + } + } + + return builder.build(); + } + @SuppressWarnings("unchecked") private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier, @@ -600,7 +804,8 @@ public class ContainerManagerImpl extend Credentials credentials = parseCredentials(launchContext); Container container = - new ContainerImpl(getConfig(), this.dispatcher, launchContext, + new ContainerImpl(getConfig(), this.dispatcher, + context.getNMStateStore(), launchContext, credentials, metrics, containerTokenIdentifier); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); @@ -621,12 +826,15 @@ public class ContainerManagerImpl extend if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); - + Map<ApplicationAccessType, String> appAcls = + container.getLaunchContext().getApplicationACLs(); + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, appAcls)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, container.getLaunchContext() - .getApplicationACLs())); + new ApplicationInitEvent(applicationID, appAcls)); } + this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -674,7 +882,7 @@ public class ContainerManagerImpl extend } private Credentials parseCredentials(ContainerLaunchContext launchContext) - throws YarnException { + throws IOException { Credentials credentials = new Credentials(); // //////////// Parse credentials ByteBuffer tokens = launchContext.getTokens(); @@ -683,15 +891,11 @@ public class ContainerManagerImpl extend DataInputByteBuffer buf = new DataInputByteBuffer(); tokens.rewind(); buf.reset(tokens); - try { - credentials.readTokenStorageStream(buf); - if (LOG.isDebugEnabled()) { - for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { - LOG.debug(tk.getService() + " = " + tk.toString()); - } + credentials.readTokenStorageStream(buf); + if (LOG.isDebugEnabled()) { + for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); } - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); } } // //////////// End of parsing credentials @@ -724,7 +928,7 @@ public class ContainerManagerImpl extend @SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + ContainerId containerID) throws YarnException, IOException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); @@ -737,6 +941,7 @@ public class ContainerManagerImpl extend + " is not handled by this NodeManager"); } } else { + context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, ContainerExitStatus.KILLED_BY_APPMASTER, @@ -875,6 +1080,11 @@ public class ContainerManagerImpl extend } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) { diagnostic = "Application killed by ResourceManager"; } + try { + this.context.getNMStateStore().storeFinishedApplication(appID); + } catch (IOException e) { + LOG.error("Unable to update application state in store", e); + } this.dispatcher.getEventHandler().handle( new ApplicationFinishEvent(appID, diagnostic));
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed Aug 20 01:34:29 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -428,6 +429,11 @@ public class ApplicationImpl implements ApplicationId appId = event.getApplicationID(); app.context.getApplications().remove(appId); app.aclsManager.removeApplication(appId); + try { + app.context.getNMStateStore().removeApplication(appId); + } catch (IOException e) { + LOG.error("Unable to remove application from state store", e); + } } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Aug 20 01:34:29 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; +import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -75,6 +78,7 @@ public class ContainerImpl implements Co private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; + private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; private final ContainerLaunchContext launchContext; @@ -101,12 +105,19 @@ public class ContainerImpl implements Co private final List<LocalResourceRequest> appRsrcs = new ArrayList<LocalResourceRequest>(); + // whether container has been recovered after a restart + private RecoveredContainerStatus recoveredStatus = + RecoveredContainerStatus.REQUESTED; + // whether container was marked as killed after recovery + private boolean recoveredAsKilled = false; + public ContainerImpl(Configuration conf, Dispatcher dispatcher, - ContainerLaunchContext launchContext, Credentials creds, - NodeManagerMetrics metrics, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier) { this.daemonConf = conf; this.dispatcher = dispatcher; + this.stateStore = stateStore; this.launchContext = launchContext; this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); @@ -122,6 +133,21 @@ public class ContainerImpl implements Co stateMachine = stateMachineFactory.make(this); } + // constructor for a recovered container + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + NMStateStoreService stateStore, ContainerLaunchContext launchContext, + Credentials creds, NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, + RecoveredContainerStatus recoveredStatus, int exitCode, + String diagnostics, boolean wasKilled) { + this(conf, dispatcher, stateStore, launchContext, creds, metrics, + containerTokenIdentifier); + this.recoveredStatus = recoveredStatus; + this.exitCode = exitCode; + this.recoveredAsKilled = wasKilled; + this.diagnostics.append(diagnostics); + } + private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION = new ContainerDoneTransition(); @@ -135,8 +161,10 @@ public class ContainerImpl implements Co new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW) // From NEW State .addTransition(ContainerState.NEW, - EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED, - ContainerState.LOCALIZATION_FAILED), + EnumSet.of(ContainerState.LOCALIZING, + ContainerState.LOCALIZED, + ContainerState.LOCALIZATION_FAILED, + ContainerState.DONE), ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) .addTransition(ContainerState.NEW, ContainerState.NEW, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, @@ -281,7 +309,9 @@ public class ContainerImpl implements Co UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -295,7 +325,9 @@ public class ContainerImpl implements Co // we notify container of failed localization if localizer thread (for // that container) fails for some reason .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.RESOURCE_FAILED) + EnumSet.of(ContainerEventType.RESOURCE_FAILED, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) // create the topology tables .installTopology(); @@ -420,7 +452,7 @@ public class ContainerImpl implements Co } } - @SuppressWarnings({"fallthrough", "unchecked"}) + @SuppressWarnings("fallthrough") private void finished() { ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); @@ -458,7 +490,11 @@ public class ContainerImpl implements Co } metrics.releaseContainer(this.resource); + sendFinishedEvents(); + } + @SuppressWarnings("unchecked") + private void sendFinishedEvents() { // Inform the application @SuppressWarnings("rawtypes") EventHandler eventHandler = dispatcher.getEventHandler(); @@ -471,6 +507,45 @@ public class ContainerImpl implements Co } @SuppressWarnings("unchecked") // dispatcher not typed + private void sendLaunchEvent() { + ContainersLauncherEventType launcherEvent = + ContainersLauncherEventType.LAUNCH_CONTAINER; + if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { + // try to recover a container that was previously launched + launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } + dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(this, launcherEvent)); + } + + // Inform the ContainersMonitor to start monitoring the container's + // resource usage. + @SuppressWarnings("unchecked") // dispatcher not typed + private void sendContainerMonitorStartEvent() { + long pmemBytes = getResource().getMemory() * 1024 * 1024L; + float pmemRatio = daemonConf.getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + + dispatcher.getEventHandler().handle( + new ContainerStartMonitoringEvent(containerId, + vmemBytes, pmemBytes)); + } + + private void addDiagnostics(String... diags) { + for (String s : diags) { + this.diagnostics.append(s); + } + try { + stateStore.storeContainerDiagnostics(containerId, diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update diagnostics in state store for " + + containerId, e); + } + } + + @SuppressWarnings("unchecked") // dispatcher not typed public void cleanup() { Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc = new HashMap<LocalResourceVisibility, @@ -518,6 +593,16 @@ public class ContainerImpl implements Co @Override public ContainerState transition(ContainerImpl container, ContainerEvent event) { + if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { + container.sendFinishedEvents(); + return ContainerState.DONE; + } else if (container.recoveredAsKilled && + container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { + // container was killed but never launched + container.finished(); + return ContainerState.DONE; + } + final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); @@ -593,9 +678,7 @@ public class ContainerImpl implements Co new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -606,7 +689,6 @@ public class ContainerImpl implements Co * Transition when one of the requested resources for this container * has been successfully localized. */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LocalizedTransition implements MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> { @Override @@ -626,9 +708,8 @@ public class ContainerImpl implements Co if (!container.pendingResources.isEmpty()) { return ContainerState.LOCALIZING; } - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.LAUNCH_CONTAINER)); + + container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -638,24 +719,22 @@ public class ContainerImpl implements Co * Transition from LOCALIZED state to RUNNING state upon receiving * a CONTAINER_LAUNCHED event */ - @SuppressWarnings("unchecked") // dispatcher not typed static class LaunchTransition extends ContainerTransition { + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { - // Inform the ContainersMonitor to start monitoring the container's - // resource usage. - long pmemBytes = - container.getResource().getMemory() * 1024 * 1024L; - float pmemRatio = container.daemonConf.getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - long vmemBytes = (long) (pmemRatio * pmemBytes); - - container.dispatcher.getEventHandler().handle( - new ContainerStartMonitoringEvent(container.containerId, - vmemBytes, pmemBytes)); + container.sendContainerMonitorStartEvent(); container.metrics.runningContainer(); container.wasLaunched = true; + + if (container.recoveredAsKilled) { + LOG.info("Killing " + container.containerId + + " due to recovered as killed"); + container.addDiagnostics("Container recovered as killed.\n"); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } } } @@ -707,8 +786,7 @@ public class ContainerImpl implements Co ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // TODO: Add containerWorkDir to the deletion service. @@ -735,7 +813,7 @@ public class ContainerImpl implements Co @Override public void transition(ContainerImpl container, ContainerEvent event) { super.transition(container, event); - container.diagnostics.append("Killed by external signal\n"); + container.addDiagnostics("Killed by external signal\n"); } } @@ -750,9 +828,7 @@ public class ContainerImpl implements Co ContainerResourceFailedEvent rsrcFailedEvent = (ContainerResourceFailedEvent) event; - container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage() - + "\n"); - + container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n"); // Inform the localizer to decrement reference counts and cleanup // resources. @@ -775,8 +851,8 @@ public class ContainerImpl implements Co container.metrics.endInitingContainer(); ContainerKillEvent killEvent = (ContainerKillEvent) event; container.exitCode = killEvent.getContainerExitStatus(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); } } @@ -817,7 +893,7 @@ public class ContainerImpl implements Co new ContainersLauncherEvent(container, ContainersLauncherEventType.CLEANUP_CONTAINER)); ContainerKillEvent killEvent = (ContainerKillEvent) event; - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); container.exitCode = killEvent.getContainerExitStatus(); } } @@ -836,8 +912,7 @@ public class ContainerImpl implements Co } if (exitEvent.getDiagnosticInfo() != null) { - container.diagnostics.append(exitEvent.getDiagnosticInfo()) - .append('\n'); + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } // The process/process-grp is killed. Decrement reference counts and @@ -877,8 +952,8 @@ public class ContainerImpl implements Co public void transition(ContainerImpl container, ContainerEvent event) { ContainerKillEvent killEvent = (ContainerKillEvent) event; container.exitCode = killEvent.getContainerExitStatus(); - container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); - container.diagnostics.append("Container is killed before being launched.\n"); + container.addDiagnostics(killEvent.getDiagnostic(), "\n"); + container.addDiagnostics("Container is killed before being launched.\n"); super.transition(container, event); } } @@ -892,8 +967,14 @@ public class ContainerImpl implements Co public void transition(ContainerImpl container, ContainerEvent event) { ContainerDiagnosticsUpdateEvent updateEvent = (ContainerDiagnosticsUpdateEvent) event; - container.diagnostics.append(updateEvent.getDiagnosticsUpdate()) - .append("\n"); + container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n"); + try { + container.stateStore.storeContainerDiagnostics(container.containerId, + container.diagnostics); + } catch (IOException e) { + LOG.warn("Unable to update state store diagnostics for " + + container.containerId, e); + } } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Aug 20 01:34:29 2014 @@ -87,22 +87,23 @@ public class ContainerLaunch implements public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; private static final String PID_FILE_NAME_FMT = "%s.pid"; + private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; - private final Dispatcher dispatcher; - private final ContainerExecutor exec; + protected final Dispatcher dispatcher; + protected final ContainerExecutor exec; private final Application app; - private final Container container; + protected final Container container; private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); - private volatile AtomicBoolean completed = new AtomicBoolean(false); + protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; - private Path pidFilePath = null; + protected Path pidFilePath = null; private final LocalDirsHandlerService dirsHandler; @@ -223,14 +224,11 @@ public class ContainerLaunch implements + Path.SEPARATOR + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, false); - String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT, - containerIdStr); + String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not // accessible by users - pidFilePath = dirsHandler.getLocalPathForWrite( - ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR - + pidFileSuffix); + pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); List<String> localDirs = dirsHandler.getLocalDirs(); List<String> logDirs = dirsHandler.getLogDirs(); @@ -288,6 +286,7 @@ public class ContainerLaunch implements dispatcher.getEventHandler().handle(new ContainerEvent( containerID, ContainerEventType.CONTAINER_LAUNCHED)); + context.getNMStateStore().storeContainerLaunched(containerID); // Check if the container is signalled to be killed. if (!shouldLaunchContainer.compareAndSet(false, true)) { @@ -310,6 +309,11 @@ public class ContainerLaunch implements } finally { completed.set(true); exec.deactivateContainer(containerID); + try { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerID); + } } if (LOG.isDebugEnabled()) { @@ -342,6 +346,11 @@ public class ContainerLaunch implements ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); return 0; } + + protected String getPidFileSubpath(String appIdStr, String containerIdStr) { + return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr); + } /** * Cleanup the container. @@ -357,6 +366,13 @@ public class ContainerLaunch implements String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); + try { + context.getNMStateStore().storeContainerKilled(containerId); + } catch (IOException e) { + LOG.error("Unable to mark container " + containerId + + " killed in store", e); + } + // launch flag will be set to true if process already launched boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); if (!alreadyLaunched) { @@ -421,6 +437,7 @@ public class ContainerLaunch implements if (pidFilePath != null) { FileContext lfs = FileContext.getLocalFSFileContext(); lfs.delete(pidFilePath, false); + lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); } } } @@ -479,6 +496,10 @@ public class ContainerLaunch implements + appIdStr; } + Context getContext() { + return context; + } + @VisibleForTesting static abstract class ShellScriptBuilder { public static ShellScriptBuilder create() { @@ -787,4 +808,7 @@ public class ContainerLaunch implements } } + public static String getExitCodeFile(String pidFile) { + return pidFile + EXIT_CODE_FILE_SUFFIX; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Aug 20 01:34:29 2014 @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -107,7 +101,6 @@ public class ContainersLauncher extends super.serviceStop(); } - @SuppressWarnings("unchecked") @Override public void handle(ContainersLauncherEvent event) { // TODO: ContainersLauncher launches containers one by one!! @@ -125,6 +118,14 @@ public class ContainersLauncher extends containerLauncher.submit(launch); running.put(containerId, launch); break; + case RECOVER_CONTAINER: + app = context.getApplications().get( + containerId.getApplicationAttemptId().getApplicationId()); + launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, + exec, app, event.getContainer(), dirsHandler, containerManager); + containerLauncher.submit(launch); + running.put(containerId, launch); + break; case CLEANUP_CONTAINER: ContainerLaunch launcher = running.remove(containerId); if (launcher == null) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Wed Aug 20 01:34:29 2014 @@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no public enum ContainersLauncherEventType { LAUNCH_CONTAINER, + RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Aug 20 01:34:29 2014 @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Even import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; @@ -251,6 +252,7 @@ public class ResourceLocalizationService cacheCleanupPeriod = conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS); localizationServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); @@ -341,7 +343,9 @@ public class ResourceLocalizationService server = createServer(); server.start(); localizationServerAddress = - getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS, + getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, server.getListenerAddress()); LOG.info("Localizer started on port " + server.getPort()); super.serviceStart(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Wed Aug 20 01:34:29 2014 @@ -25,5 +25,7 @@ public interface AppLogAggregator extend void startContainerLogAggregation(ContainerId containerId, boolean wasContainerSuccessful); + void abortLogAggregation(); + void finishLogAggregation(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Aug 20 01:34:29 2014 @@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem private final BlockingQueue<ContainerId> pendingContainers; private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); + private final AtomicBoolean aborted = new AtomicBoolean(); private final Map<ApplicationAccessType, String> appAcls; private LogWriter writer = null; @@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem private void doAppLogAggregation() { ContainerId containerId; - while (!this.appFinishing.get()) { + while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { wait(THREAD_SLEEP_TIME); @@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem } } + if (this.aborted.get()) { + return; + } + // Application is finished. Finish pending-containers while ((containerId = this.pendingContainers.poll()) != null) { uploadLogsForContainer(containerId); @@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem this.appFinishing.set(true); this.notifyAll(); } + + @Override + public synchronized void abortLogAggregation() { + LOG.info("Aborting log aggregation for " + this.applicationId); + this.aborted.set(true); + this.notifyAll(); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Aug 20 01:34:29 2014 @@ -142,9 +142,17 @@ public class LogAggregationService exten private void stopAggregators() { threadPool.shutdown(); + // if recovery on restart is supported then leave outstanding aggregations + // to the next restart + boolean shouldAbort = context.getNMStateStore().canRecover() + && !context.getDecommissioned(); // politely ask to finish for (AppLogAggregator aggregator : appLogAggregators.values()) { - aggregator.finishLogAggregation(); + if (shouldAbort) { + aggregator.abortLogAggregation(); + } else { + aggregator.finishLogAggregation(); + } } while (!threadPool.isTerminated()) { // wait for all threads to finish for (ApplicationId appId : appLogAggregators.keySet()) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Wed Aug 20 01:34:29 2014 @@ -35,19 +35,23 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; -import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; -import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; -import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -68,12 +72,17 @@ public class NMLeveldbStateStoreService private static final String DB_NAME = "yarn-nm-state"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; - private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion + private static final Version CURRENT_VERSION_INFO = Version .newInstance(1, 0); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; + private static final String APPLICATIONS_KEY_PREFIX = + "ContainerManager/applications/"; + private static final String FINISHED_APPS_KEY_PREFIX = + "ContainerManager/finishedApps/"; + private static final String LOCALIZATION_KEY_PREFIX = "Localization/"; private static final String LOCALIZATION_PUBLIC_KEY_PREFIX = LOCALIZATION_KEY_PREFIX + "public/"; @@ -84,6 +93,14 @@ public class NMLeveldbStateStoreService private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/"; private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/"; + private static final String CONTAINERS_KEY_PREFIX = + "ContainerManager/containers/"; + private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; + private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; + private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; + private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/"; @@ -98,6 +115,8 @@ public class NMLeveldbStateStoreService private static final String CONTAINER_TOKENS_PREV_MASTER_KEY = CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; + private static final byte[] EMPTY_VALUE = new byte[0]; + private DB db; public NMLeveldbStateStoreService() { @@ -117,6 +136,246 @@ public class NMLeveldbStateStoreService @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + ArrayList<RecoveredContainerState> containers = + new ArrayList<RecoveredContainerState>(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + + while (iter.hasNext()) { + Entry<byte[],byte[]> entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { + break; + } + + int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); + if (idEndPos < 0) { + throw new IOException("Unable to determine container in key: " + key); + } + ContainerId containerId = ConverterUtils.toContainerId( + key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); + String keyPrefix = key.substring(0, idEndPos+1); + containers.add(loadContainerState(containerId, iter, keyPrefix)); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + return containers; + } + + private RecoveredContainerState loadContainerState(ContainerId containerId, + LeveldbIterator iter, String keyPrefix) throws IOException { + RecoveredContainerState rcs = new RecoveredContainerState(); + rcs.status = RecoveredContainerStatus.REQUESTED; + while (iter.hasNext()) { + Entry<byte[],byte[]> entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + iter.next(); + + String suffix = key.substring(keyPrefix.length()-1); // start with '/' + if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { + rcs.startRequest = new StartContainerRequestPBImpl( + StartContainerRequestProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { + rcs.diagnostics = asString(entry.getValue()); + } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { + if (rcs.status == RecoveredContainerStatus.REQUESTED) { + rcs.status = RecoveredContainerStatus.LAUNCHED; + } + } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { + rcs.killed = true; + } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { + rcs.status = RecoveredContainerStatus.COMPLETED; + rcs.exitCode = Integer.parseInt(asString(entry.getValue())); + } else { + throw new IOException("Unexpected container state key: " + key); + } + } + return rcs; + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REQUEST_KEY_SUFFIX; + try { + db.put(bytes(key), + ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_DIAGS_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(diagnostics.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_LAUNCHED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_KILLED_KEY_SUFFIX; + try { + db.put(bytes(key), EMPTY_VALUE); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_EXIT_CODE_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(exitCode))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeContainer(ContainerId containerId) + throws IOException { + String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); + batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.applications = new ArrayList<ContainerManagerApplicationProto>(); + String keyPrefix = APPLICATIONS_KEY_PREFIX; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(keyPrefix)); + while (iter.hasNext()) { + Entry<byte[], byte[]> entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + state.applications.add( + ContainerManagerApplicationProto.parseFrom(entry.getValue())); + } + + state.finishedApplications = new ArrayList<ApplicationId>(); + keyPrefix = FINISHED_APPS_KEY_PREFIX; + iter.seek(bytes(keyPrefix)); + while (iter.hasNext()) { + Entry<byte[], byte[]> entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + ApplicationId appId = + ConverterUtils.toApplicationId(key.substring(keyPrefix.length())); + state.finishedApplications.add(appId); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + return state; + } + + @Override + public void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto p) throws IOException { + String key = APPLICATIONS_KEY_PREFIX + appId; + try { + db.put(bytes(key), p.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeFinishedApplication(ApplicationId appId) + throws IOException { + String key = FINISHED_APPS_KEY_PREFIX + appId; + try { + db.put(bytes(key), new byte[0]); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeApplication(ApplicationId appId) + throws IOException { + try { + WriteBatch batch = db.createWriteBatch(); + try { + String key = APPLICATIONS_KEY_PREFIX + appId; + batch.delete(bytes(key)); + key = FINISHED_APPS_KEY_PREFIX + appId; + batch.delete(bytes(key)); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + + @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { RecoveredLocalizationState state = new RecoveredLocalizationState(); @@ -617,14 +876,14 @@ public class NMLeveldbStateStoreService } - NMDBSchemaVersion loadVersion() throws IOException { + Version loadVersion() throws IOException { byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); // if version is not stored previously, treat it as 1.0. if (data == null || data.length == 0) { - return NMDBSchemaVersion.newInstance(1, 0); + return Version.newInstance(1, 0); } - NMDBSchemaVersion version = - new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); return version; } @@ -634,14 +893,14 @@ public class NMLeveldbStateStoreService // Only used for test @VisibleForTesting - void storeVersion(NMDBSchemaVersion state) throws IOException { + void storeVersion(Version state) throws IOException { dbStoreVersion(state); } - private void dbStoreVersion(NMDBSchemaVersion state) throws IOException { + private void dbStoreVersion(Version state) throws IOException { String key = DB_SCHEMA_VERSION_KEY; byte[] data = - ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray(); + ((VersionPBImpl) state).getProto().toByteArray(); try { db.put(bytes(key), data); } catch (DBException e) { @@ -649,7 +908,7 @@ public class NMLeveldbStateStoreService } } - NMDBSchemaVersion getCurrentVersion() { + Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @@ -664,9 +923,9 @@ public class NMLeveldbStateStoreService * upgrade NM state or remove incompatible old state. */ private void checkVersion() throws IOException { - NMDBSchemaVersion loadedVersion = loadVersion(); + Version loadedVersion = loadVersion(); LOG.info("Loaded NM state version info " + loadedVersion); - if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + if (loadedVersion.equals(getCurrentVersion())) { return; } if (loadedVersion.isCompatibleTo(getCurrentVersion())) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Wed Aug 20 01:34:29 2014 @@ -19,13 +19,16 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -43,6 +46,61 @@ public class NMNullStateStoreService ext } @Override + public RecoveredApplicationsState loadApplicationsState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto p) throws IOException { + } + + @Override + public void storeFinishedApplication(ApplicationId appId) { + } + + @Override + public void removeApplication(ApplicationId appId) throws IOException { + } + + @Override + public List<RecoveredContainerState> loadContainersState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException { + } + + @Override + public void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException { + } + + @Override + public void storeContainerLaunched(ContainerId containerId) + throws IOException { + } + + @Override + public void storeContainerKilled(ContainerId containerId) + throws IOException { + } + + @Override + public void storeContainerCompleted(ContainerId containerId, int exitCode) + throws IOException { + } + + @Override + public void removeContainer(ContainerId containerId) throws IOException { + } + + @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { throw new UnsupportedOperationException( Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Wed Aug 20 01:34:29 2014 @@ -29,10 +29,13 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -45,6 +48,53 @@ public abstract class NMStateStoreServic super(name); } + public static class RecoveredApplicationsState { + List<ContainerManagerApplicationProto> applications; + List<ApplicationId> finishedApplications; + + public List<ContainerManagerApplicationProto> getApplications() { + return applications; + } + + public List<ApplicationId> getFinishedApplications() { + return finishedApplications; + } + } + + public enum RecoveredContainerStatus { + REQUESTED, + LAUNCHED, + COMPLETED + } + + public static class RecoveredContainerState { + RecoveredContainerStatus status; + int exitCode = ContainerExitStatus.INVALID; + boolean killed = false; + String diagnostics = ""; + StartContainerRequest startRequest; + + public RecoveredContainerStatus getStatus() { + return status; + } + + public int getExitCode() { + return exitCode; + } + + public boolean getKilled() { + return killed; + } + + public String getDiagnostics() { + return diagnostics; + } + + public StartContainerRequest getStartRequest() { + return startRequest; + } + } + public static class LocalResourceTrackerState { List<LocalizedResourceProto> localizedResources = new ArrayList<LocalizedResourceProto>(); @@ -163,6 +213,100 @@ public abstract class NMStateStoreServic /** + * Load the state of applications + * @return recovered state for applications + * @throws IOException + */ + public abstract RecoveredApplicationsState loadApplicationsState() + throws IOException; + + /** + * Record the start of an application + * @param appId the application ID + * @param p state to store for the application + * @throws IOException + */ + public abstract void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto p) throws IOException; + + /** + * Record that an application has finished + * @param appId the application ID + * @throws IOException + */ + public abstract void storeFinishedApplication(ApplicationId appId) + throws IOException; + + /** + * Remove records corresponding to an application + * @param appId the application ID + * @throws IOException + */ + public abstract void removeApplication(ApplicationId appId) + throws IOException; + + + /** + * Load the state of containers + * @return recovered state for containers + * @throws IOException + */ + public abstract List<RecoveredContainerState> loadContainersState() + throws IOException; + + /** + * Record a container start request + * @param containerId the container ID + * @param startRequest the container start request + * @throws IOException + */ + public abstract void storeContainer(ContainerId containerId, + StartContainerRequest startRequest) throws IOException; + + /** + * Record that a container has been launched + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerLaunched(ContainerId containerId) + throws IOException; + + /** + * Record that a container has completed + * @param containerId the container ID + * @param exitCode the exit code from the container + * @throws IOException + */ + public abstract void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException; + + /** + * Record a request to kill a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void storeContainerKilled(ContainerId containerId) + throws IOException; + + /** + * Record diagnostics for a container + * @param containerId the container ID + * @param diagnostics the container diagnostics + * @throws IOException + */ + public abstract void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException; + + /** + * Remove records corresponding to a container + * @param containerId the container ID + * @throws IOException + */ + public abstract void removeContainer(ContainerId containerId) + throws IOException; + + + /** * Load the state of localized resources * @return recovered localized resource state * @throws IOException @@ -203,43 +347,111 @@ public abstract class NMStateStoreServic ApplicationId appId, Path localPath) throws IOException; + /** + * Load the state of the deletion service + * @return recovered deletion service state + * @throws IOException + */ public abstract RecoveredDeletionServiceState loadDeletionServiceState() throws IOException; + /** + * Record a deletion task + * @param taskId the deletion task ID + * @param taskProto the deletion task protobuf + * @throws IOException + */ public abstract void storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto) throws IOException; + /** + * Remove records corresponding to a deletion task + * @param taskId the deletion task ID + * @throws IOException + */ public abstract void removeDeletionTask(int taskId) throws IOException; + /** + * Load the state of NM tokens + * @return recovered state of NM tokens + * @throws IOException + */ public abstract RecoveredNMTokensState loadNMTokensState() throws IOException; + /** + * Record the current NM token master key + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous NM token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeNMTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record a master key corresponding to an application + * @param attempt the application attempt ID + * @param key the master key + * @throws IOException + */ public abstract void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException; + /** + * Remove a master key corresponding to an application + * @param attempt the application attempt ID + * @throws IOException + */ public abstract void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException; + /** + * Load the state of container tokens + * @return recovered state of container tokens + * @throws IOException + */ public abstract RecoveredContainerTokensState loadContainerTokensState() throws IOException; + /** + * Record the current container token master key + * @param key the master key + * @throws IOException + */ public abstract void storeContainerTokenCurrentMasterKey(MasterKey key) throws IOException; + /** + * Record the previous container token master key + * @param key the previous master key + * @throws IOException + */ public abstract void storeContainerTokenPreviousMasterKey(MasterKey key) throws IOException; + /** + * Record the expiration time for a container token + * @param containerId the container ID + * @param expirationTime the container token expiration time + * @throws IOException + */ public abstract void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException; + /** + * Remove records for a container token + * @param containerId the container ID + * @throws IOException + */ public abstract void removeContainerToken(ContainerId containerId) throws IOException; Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Wed Aug 20 01:34:29 2014 @@ -55,7 +55,9 @@ public class WebServer extends AbstractS @Override protected void serviceStart() throws Exception { - String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()); + String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(), + YarnConfiguration.NM_BIND_HOST, + WebAppUtils.getNMWebAppURLWithoutScheme(getConfig())); LOG.info("Instantiating NMWebApp at " + bindAddress); try {
