http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java deleted file mode 100644 index b2d9dcb..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ /dev/null @@ -1,647 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; -import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; -import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Class extending {@link ContainerManagerImpl} and is used when queuing at the - * NM is enabled. - */ -public class QueuingContainerManagerImpl extends ContainerManagerImpl { - - private static final Logger LOG = LoggerFactory - .getLogger(QueuingContainerManagerImpl.class); - - private ConcurrentMap<ContainerId, AllocatedContainerInfo> - allocatedGuaranteedContainers; - private ConcurrentMap<ContainerId, AllocatedContainerInfo> - allocatedOpportunisticContainers; - - private long allocatedMemoryOpportunistic; - private int allocatedVCoresOpportunistic; - - private Queue<AllocatedContainerInfo> queuedGuaranteedContainers; - private Queue<AllocatedContainerInfo> queuedOpportunisticContainers; - - private Set<ContainerId> opportunisticContainersToKill; - private final OpportunisticContainersStatus opportunisticContainersStatus; - private final ContainerQueuingLimit queuingLimit; - - public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, - DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, - NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { - super(context, exec, deletionContext, nodeStatusUpdater, metrics, - dirsHandler); - this.allocatedGuaranteedContainers = new ConcurrentHashMap<>(); - this.allocatedOpportunisticContainers = new ConcurrentHashMap<>(); - this.allocatedMemoryOpportunistic = 0; - this.allocatedVCoresOpportunistic = 0; - this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>(); - this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); - this.opportunisticContainersToKill = Collections.synchronizedSet( - new HashSet<ContainerId>()); - this.opportunisticContainersStatus = - OpportunisticContainersStatus.newInstance(); - this.queuingLimit = ContainerQueuingLimit.newInstance(); - } - - @Override - protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() { - return new QueuingApplicationEventDispatcher( - super.createApplicationEventDispatcher()); - } - - @Override - protected void startContainerInternal( - ContainerTokenIdentifier containerTokenIdentifier, - StartContainerRequest request) throws YarnException, IOException { - this.context.getQueuingContext().getQueuedContainers().put( - containerTokenIdentifier.getContainerID(), containerTokenIdentifier); - - AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo( - containerTokenIdentifier, request, - containerTokenIdentifier.getExecutionType(), containerTokenIdentifier - .getResource(), getConfig()); - - // If there are already free resources for the container to start, and - // there are no queued containers waiting to be executed, start this - // container immediately. - if (queuedGuaranteedContainers.isEmpty() && - queuedOpportunisticContainers.isEmpty() && - getContainersMonitor(). - hasResourcesAvailable(allocatedContInfo.getPti())) { - startAllocatedContainer(allocatedContInfo); - } else { - ContainerId cIdToStart = containerTokenIdentifier.getContainerID(); - LOG.info("No available resources for container {} to start its execution " - + "immediately.", cIdToStart); - if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.add(allocatedContInfo); - // Kill running opportunistic containers to make space for - // guaranteed container. - killOpportunisticContainers(allocatedContInfo); - } else { - LOG.info("Opportunistic container {} will be queued at the NM.", - cIdToStart); - queuedOpportunisticContainers.add(allocatedContInfo); - } - } - } - - @Override - protected void stopContainerInternal(ContainerId containerID) - throws YarnException, IOException { - Container container = this.context.getContainers().get(containerID); - // If container is null and distributed scheduling is enabled, container - // might be queued. Otherwise, container might not be handled by this NM. - if (container == null && this.context.getQueuingContext() - .getQueuedContainers().containsKey(containerID)) { - ContainerTokenIdentifier containerTokenId = this.context - .getQueuingContext().getQueuedContainers().remove(containerID); - - boolean foundInQueue = removeQueuedContainer(containerID, - containerTokenId.getExecutionType()); - - if (foundInQueue) { - LOG.info("Removing queued container with ID " + containerID); - this.context.getQueuingContext().getKilledQueuedContainers().put( - containerTokenId, - "Queued container request removed by ApplicationMaster."); - } else { - // The container started execution in the meanwhile. - try { - stopContainerInternalIfRunning(containerID); - } catch (YarnException | IOException e) { - LOG.error("Container did not get removed successfully.", e); - } - } - - nodeStatusUpdater.sendOutofBandHeartBeat(); - } else { - super.stopContainerInternal(containerID); - } - } - - /** - * Start the execution of the given container. Also add it to the allocated - * containers, and update allocated resource utilization. - */ - private void startAllocatedContainer( - AllocatedContainerInfo allocatedContainerInfo) { - ProcessTreeInfo pti = allocatedContainerInfo.getPti(); - - if (allocatedContainerInfo.getExecutionType() == - ExecutionType.GUARANTEED) { - allocatedGuaranteedContainers.put(pti.getContainerId(), - allocatedContainerInfo); - } else { - allocatedOpportunisticContainers.put(pti.getContainerId(), - allocatedContainerInfo); - allocatedMemoryOpportunistic += pti.getPmemLimit(); - allocatedVCoresOpportunistic += pti.getCpuVcores(); - } - - getContainersMonitor().increaseContainersAllocation(pti); - - // Start execution of container. - ContainerId containerId = allocatedContainerInfo - .getContainerTokenIdentifier().getContainerID(); - this.context.getQueuingContext().getQueuedContainers().remove(containerId); - try { - LOG.info("Starting container [" + containerId + "]"); - super.startContainerInternal( - allocatedContainerInfo.getContainerTokenIdentifier(), - allocatedContainerInfo.getStartRequest()); - } catch (YarnException | IOException e) { - containerFailedToStart(pti.getContainerId(), - allocatedContainerInfo.getContainerTokenIdentifier()); - LOG.error("Container failed to start.", e); - } - } - - private void containerFailedToStart(ContainerId containerId, - ContainerTokenIdentifier containerTokenId) { - this.context.getQueuingContext().getQueuedContainers().remove(containerId); - - removeAllocatedContainer(containerId); - - this.context.getQueuingContext().getKilledQueuedContainers().put( - containerTokenId, - "Container removed from queue as it failed to start."); - } - - /** - * Remove the given container from the container queues. - * - * @return true if the container was found in one of the queues. - */ - private boolean removeQueuedContainer(ContainerId containerId, - ExecutionType executionType) { - Queue<AllocatedContainerInfo> queue = - (executionType == ExecutionType.GUARANTEED) ? - queuedGuaranteedContainers : queuedOpportunisticContainers; - - boolean foundInQueue = false; - Iterator<AllocatedContainerInfo> iter = queue.iterator(); - while (iter.hasNext() && !foundInQueue) { - if (iter.next().getPti().getContainerId().equals(containerId)) { - iter.remove(); - foundInQueue = true; - } - } - - return foundInQueue; - } - - /** - * Remove the given container from the allocated containers, and update - * allocated container utilization accordingly. - */ - private void removeAllocatedContainer(ContainerId containerId) { - AllocatedContainerInfo contToRemove = null; - - contToRemove = allocatedGuaranteedContainers.remove(containerId); - - if (contToRemove == null) { - contToRemove = allocatedOpportunisticContainers.remove(containerId); - } - - // If container was indeed running, update allocated resource utilization. - if (contToRemove != null) { - getContainersMonitor().decreaseContainersAllocation(contToRemove - .getPti()); - - if (contToRemove.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - allocatedMemoryOpportunistic -= contToRemove.getPti().getPmemLimit(); - allocatedVCoresOpportunistic -= contToRemove.getPti().getCpuVcores(); - } - } - } - - /** - * Stop a container only if it is currently running. If queued, do not stop - * it. - */ - private void stopContainerInternalIfRunning(ContainerId containerID) - throws YarnException, IOException { - if (this.context.getContainers().containsKey(containerID)) { - stopContainerInternal(containerID); - } - } - - /** - * Kill opportunistic containers to free up resources for running the given - * container. - * - * @param allocatedContInfo - * the container whose execution needs to start by freeing up - * resources occupied by opportunistic containers. - */ - private void killOpportunisticContainers( - AllocatedContainerInfo allocatedContInfo) { - ContainerId containerToStartId = allocatedContInfo.getPti() - .getContainerId(); - List<ContainerId> extraOpportContainersToKill = - pickOpportunisticContainersToKill(containerToStartId); - - // Kill the opportunistic containers that were chosen. - for (ContainerId contIdToKill : extraOpportContainersToKill) { - try { - stopContainerInternalIfRunning(contIdToKill); - } catch (YarnException | IOException e) { - LOG.error("Container did not get removed successfully.", e); - } - LOG.info( - "Opportunistic container {} will be killed in order to start the " - + "execution of guaranteed container {}.", - contIdToKill, containerToStartId); - } - } - - /** - * Choose the opportunistic containers to kill in order to free up resources - * for running the given container. - * - * @param containerToStartId - * the container whose execution needs to start by freeing up - * resources occupied by opportunistic containers. - * @return the additional opportunistic containers that need to be killed. - */ - protected List<ContainerId> pickOpportunisticContainersToKill( - ContainerId containerToStartId) { - // The additional opportunistic containers that need to be killed for the - // given container to start. - List<ContainerId> extraOpportContainersToKill = new ArrayList<>(); - // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( - containerToStartId); - - // Go over the running opportunistic containers. Avoid containers that have - // already been marked for killing. - boolean hasSufficientResources = false; - for (Map.Entry<ContainerId, AllocatedContainerInfo> runningOpportCont : - allocatedOpportunisticContainers.entrySet()) { - ContainerId runningOpportContId = runningOpportCont.getKey(); - - // If there are sufficient resources to execute the given container, do - // not kill more opportunistic containers. - if (resourcesToFreeUp.getPhysicalMemory() <= 0 && - resourcesToFreeUp.getVirtualMemory() <= 0 && - resourcesToFreeUp.getCPU() <= 0.0f) { - hasSufficientResources = true; - break; - } - - if (!opportunisticContainersToKill.contains(runningOpportContId)) { - extraOpportContainersToKill.add(runningOpportContId); - opportunisticContainersToKill.add(runningOpportContId); - getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp, - runningOpportCont.getValue().getPti()); - } - } - - if (!hasSufficientResources) { - LOG.info( - "There are no sufficient resources to start guaranteed {} even after " - + "attempting to kill any running opportunistic containers.", - containerToStartId); - } - - return extraOpportContainersToKill; - } - - /** - * Calculates the amount of resources that need to be freed up (by killing - * opportunistic containers) in order for the given guaranteed container to - * start its execution. Resource allocation to be freed up = - * <code>containersAllocation</code> - - * allocation of <code>opportunisticContainersToKill</code> + - * allocation of <code>queuedGuaranteedContainers</code> that will start - * before the given container + - * allocation of given container - - * total resources of node. - * - * @param containerToStartId - * the ContainerId of the guaranteed container for which we need to - * free resources, so that its execution can start. - * @return the resources that need to be freed up for the given guaranteed - * container to start. - */ - private ResourceUtilization resourcesToFreeUp( - ContainerId containerToStartId) { - // Get allocation of currently allocated containers. - ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization - .newInstance(getContainersMonitor().getContainersAllocation()); - - // Subtract from the allocation the allocation of the opportunistic - // containers that are marked for killing. - for (ContainerId opportContId : opportunisticContainersToKill) { - if (allocatedOpportunisticContainers.containsKey(opportContId)) { - getContainersMonitor().decreaseResourceUtilization( - resourceAllocationToFreeUp, - allocatedOpportunisticContainers.get(opportContId).getPti()); - } - } - // Add to the allocation the allocation of the pending guaranteed - // containers that will start before the current container will be started. - for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) { - getContainersMonitor().increaseResourceUtilization( - resourceAllocationToFreeUp, guarContInfo.getPti()); - if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) { - break; - } - } - // Subtract the overall node resources. - getContainersMonitor().subtractNodeResourcesFromResourceUtilization( - resourceAllocationToFreeUp); - return resourceAllocationToFreeUp; - } - - /** - * If there are available resources, try to start as many pending containers - * as possible. - */ - private void startPendingContainers() { - // Start pending guaranteed containers, if resources available. - boolean resourcesAvailable = - startContainersFromQueue(queuedGuaranteedContainers); - - // Start opportunistic containers, if resources available. - if (resourcesAvailable) { - startContainersFromQueue(queuedOpportunisticContainers); - } - } - - private boolean startContainersFromQueue( - Queue<AllocatedContainerInfo> queuedContainers) { - Iterator<AllocatedContainerInfo> guarIter = queuedContainers.iterator(); - boolean resourcesAvailable = true; - - while (guarIter.hasNext() && resourcesAvailable) { - AllocatedContainerInfo allocatedContInfo = guarIter.next(); - - if (getContainersMonitor().hasResourcesAvailable( - allocatedContInfo.getPti())) { - startAllocatedContainer(allocatedContInfo); - guarIter.remove(); - } else { - resourcesAvailable = false; - } - } - return resourcesAvailable; - } - - @Override - protected ContainerStatus getContainerStatusInternal(ContainerId containerID, - NMTokenIdentifier nmTokenIdentifier) throws YarnException { - Container container = this.context.getContainers().get(containerID); - if (container == null) { - ContainerTokenIdentifier containerTokenId = this.context - .getQueuingContext().getQueuedContainers().get(containerID); - if (containerTokenId != null) { - ExecutionType executionType = this.context.getQueuingContext() - .getQueuedContainers().get(containerID).getExecutionType(); - return BuilderUtils.newContainerStatus(containerID, - org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "", - ContainerExitStatus.INVALID, this.context.getQueuingContext() - .getQueuedContainers().get(containerID).getResource(), - executionType); - } else { - // Check if part of the stopped/killed queued containers. - for (ContainerTokenIdentifier cTokenId : this.context - .getQueuingContext().getKilledQueuedContainers().keySet()) { - if (cTokenId.getContainerID().equals(containerID)) { - return BuilderUtils.newContainerStatus(containerID, - org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, - this.context.getQueuingContext().getKilledQueuedContainers() - .get(cTokenId), ContainerExitStatus.ABORTED, cTokenId - .getResource(), cTokenId.getExecutionType()); - } - } - } - } - return super.getContainerStatusInternal(containerID, nmTokenIdentifier); - } - - @VisibleForTesting - public int getNumAllocatedGuaranteedContainers() { - return allocatedGuaranteedContainers.size(); - } - - @VisibleForTesting - public int getNumAllocatedOpportunisticContainers() { - return allocatedOpportunisticContainers.size(); - } - - @VisibleForTesting - public int getNumQueuedGuaranteedContainers() { - return queuedGuaranteedContainers.size(); - } - - @VisibleForTesting - public int getNumQueuedOpportunisticContainers() { - return queuedOpportunisticContainers.size(); - } - - class QueuingApplicationEventDispatcher implements - EventHandler<ApplicationEvent> { - private EventHandler<ApplicationEvent> applicationEventDispatcher; - - public QueuingApplicationEventDispatcher( - EventHandler<ApplicationEvent> applicationEventDispatcher) { - this.applicationEventDispatcher = applicationEventDispatcher; - } - - @Override - public void handle(ApplicationEvent event) { - if (event.getType() == - ApplicationEventType.APPLICATION_CONTAINER_FINISHED) { - if (!(event instanceof ApplicationContainerFinishedEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - ApplicationContainerFinishedEvent finishEvent = - (ApplicationContainerFinishedEvent) event; - // Remove finished container from the allocated containers, and - // attempt to start new containers. - ContainerId contIdToRemove = finishEvent.getContainerID(); - removeAllocatedContainer(contIdToRemove); - opportunisticContainersToKill.remove(contIdToRemove); - startPendingContainers(); - } - this.applicationEventDispatcher.handle(event); - } - } - - @Override - public OpportunisticContainersStatus getOpportunisticContainersStatus() { - opportunisticContainersStatus - .setRunningOpportContainers(allocatedOpportunisticContainers.size()); - opportunisticContainersStatus - .setOpportMemoryUsed(allocatedMemoryOpportunistic); - opportunisticContainersStatus - .setOpportCoresUsed(allocatedVCoresOpportunistic); - opportunisticContainersStatus - .setQueuedOpportContainers(queuedOpportunisticContainers.size()); - opportunisticContainersStatus.setWaitQueueLength( - queuedGuaranteedContainers.size() + - queuedOpportunisticContainers.size()); - return opportunisticContainersStatus; - } - - @Override - public void updateQueuingLimit(ContainerQueuingLimit limit) { - this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength()); - // TODO: Include wait time as well once it is implemented - if (this.queuingLimit.getMaxQueueLength() > -1) { - shedQueuedOpportunisticContainers(); - } - } - - private void shedQueuedOpportunisticContainers() { - int numAllowed = this.queuingLimit.getMaxQueueLength(); - Iterator<AllocatedContainerInfo> containerIter = - queuedOpportunisticContainers.iterator(); - while (containerIter.hasNext()) { - AllocatedContainerInfo cInfo = containerIter.next(); - if (numAllowed <= 0) { - containerIter.remove(); - ContainerTokenIdentifier containerTokenIdentifier = this.context - .getQueuingContext().getQueuedContainers().remove( - cInfo.getContainerTokenIdentifier().getContainerID()); - // The Container might have already started while we were - // iterating.. - if (containerTokenIdentifier != null) { - this.context.getQueuingContext().getKilledQueuedContainers() - .putIfAbsent(cInfo.getContainerTokenIdentifier(), - "Container de-queued to meet NM queuing limits. " - + "Max Queue length[" - + this.queuingLimit.getMaxQueueLength() + "]"); - } - } - numAllowed--; - } - } - - - static class AllocatedContainerInfo { - private final ContainerTokenIdentifier containerTokenIdentifier; - private final StartContainerRequest startRequest; - private final ExecutionType executionType; - private final ProcessTreeInfo pti; - - AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier, - StartContainerRequest startRequest, ExecutionType executionType, - Resource resource, Configuration conf) { - this.containerTokenIdentifier = containerTokenIdentifier; - this.startRequest = startRequest; - this.executionType = executionType; - this.pti = createProcessTreeInfo(containerTokenIdentifier - .getContainerID(), resource, conf); - } - - private ContainerTokenIdentifier getContainerTokenIdentifier() { - return this.containerTokenIdentifier; - } - - private StartContainerRequest getStartRequest() { - return this.startRequest; - } - - private ExecutionType getExecutionType() { - return this.executionType; - } - - protected ProcessTreeInfo getPti() { - return this.pti; - } - - private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId, - Resource resource, Configuration conf) { - long pmemBytes = resource.getMemorySize() * 1024 * 1024L; - float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - long vmemBytes = (long) (pmemRatio * pmemBytes); - int cpuVcores = resource.getVirtualCores(); - - return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes, - cpuVcores); - } - - @Override - public boolean equals(Object obj) { - boolean equal = false; - if (obj instanceof AllocatedContainerInfo) { - AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj; - equal = this.getPti().getContainerId() - .equals(otherContInfo.getPti().getContainerId()); - } - return equal; - } - - @Override - public int hashCode() { - return this.getPti().getContainerId().hashCode(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java deleted file mode 100644 index 0250807..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -/** - * This package contains classes related to the queuing of containers at - * the NM. - * - */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing; http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java new file mode 100644 index 0000000..9665e75 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the {@link ResourceUtilizationTracker} that equates + * resource utilization with the total resource allocated to the container. + */ +public class AllocationBasedResourceUtilizationTracker implements + ResourceUtilizationTracker { + + private static final Logger LOG = + LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class); + + private ResourceUtilization containersAllocation; + private ContainerScheduler scheduler; + + AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) { + this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); + this.scheduler = scheduler; + } + + /** + * Get the accumulation of totally allocated resources to a container. + * @return ResourceUtilization Resource Utilization. + */ + @Override + public ResourceUtilization getCurrentUtilization() { + return this.containersAllocation; + } + + /** + * Add Container's resources to the accumulated Utilization. + * @param container Container. + */ + @Override + public void addContainerResources(Container container) { + ContainersMonitor.ContainerManagerUtils.increaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + /** + * Subtract Container's resources to the accumulated Utilization. + * @param container Container. + */ + @Override + public void subtractContainerResource(Container container) { + ContainersMonitor.ContainerManagerUtils.decreaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + /** + * Check if NM has resources available currently to run the container. + * @param container Container. + * @return True, if NM has resources available currently to run the container. + */ + @Override + public boolean hasResourcesAvailable(Container container) { + long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L; + return hasResourcesAvailable(pMemBytes, + (long) (getContainersMonitor().getVmemRatio()* pMemBytes), + container.getResource().getVirtualCores()); + } + + private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes, + int cpuVcores) { + // Check physical memory. + if (LOG.isDebugEnabled()) { + LOG.debug("pMemCheck [current={} + asked={} > allowed={}]", + this.containersAllocation.getPhysicalMemory(), + (pMemBytes >> 20), + (getContainersMonitor().getPmemAllocatedForContainers() >> 20)); + } + if (this.containersAllocation.getPhysicalMemory() + + (int) (pMemBytes >> 20) > + (int) (getContainersMonitor() + .getPmemAllocatedForContainers() >> 20)) { + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("before vMemCheck" + + "[isEnabled={}, current={} + asked={} > allowed={}]", + getContainersMonitor().isVmemCheckEnabled(), + this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20), + (getContainersMonitor().getVmemAllocatedForContainers() >> 20)); + } + // Check virtual memory. + if (getContainersMonitor().isVmemCheckEnabled() && + this.containersAllocation.getVirtualMemory() + + (int) (vMemBytes >> 20) > + (int) (getContainersMonitor() + .getVmemAllocatedForContainers() >> 20)) { + return false; + } + + float vCores = (float) cpuVcores / + getContainersMonitor().getVCoresAllocatedForContainers(); + if (LOG.isDebugEnabled()) { + LOG.debug("before cpuCheck [asked={} > allowed={}]", + this.containersAllocation.getCPU(), vCores); + } + // Check CPU. + if (this.containersAllocation.getCPU() + vCores > 1.0f) { + return false; + } + return true; + } + + public ContainersMonitor getContainersMonitor() { + return this.scheduler.getContainersMonitor(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java new file mode 100644 index 0000000..9a3cf30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + + +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * The ContainerScheduler manages a collection of runnable containers. It + * ensures that a container is launched only if all its launch criteria are + * met. It also ensures that OPPORTUNISTIC containers are killed to make + * room for GUARANTEED containers. + */ +public class ContainerScheduler extends AbstractService implements + EventHandler<ContainerSchedulerEvent> { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerScheduler.class); + + private final Context context; + private final int maxOppQueueLength; + + // Queue of Guaranteed Containers waiting for resources to run + private final LinkedHashMap<ContainerId, Container> + queuedGuaranteedContainers = new LinkedHashMap<>(); + // Queue of Opportunistic Containers waiting for resources to run + private final LinkedHashMap<ContainerId, Container> + queuedOpportunisticContainers = new LinkedHashMap<>(); + + // Used to keep track of containers that have been marked to be killed + // to make room for a guaranteed container. + private final Map<ContainerId, Container> oppContainersToKill = + new HashMap<>(); + + // Containers launched by the Scheduler will take a while to actually + // move to the RUNNING state, but should still be fair game for killing + // by the scheduler to make room for guaranteed containers. This holds + // containers that are in RUNNING as well as those in SCHEDULED state that + // have been marked to run, but not yet RUNNING. + private final LinkedHashMap<ContainerId, Container> runningContainers = + new LinkedHashMap<>(); + + private final ContainerQueuingLimit queuingLimit = + ContainerQueuingLimit.newInstance(); + + private final OpportunisticContainersStatus opportunisticContainersStatus; + + // Resource Utilization Tracker that decides how utilization of the cluster + // increases / decreases based on container start / finish + private ResourceUtilizationTracker utilizationTracker; + + private final AsyncDispatcher dispatcher; + private final NodeManagerMetrics metrics; + + /** + * Instantiate a Container Scheduler. + * @param context NodeManager Context. + * @param dispatcher AsyncDispatcher. + * @param metrics NodeManagerMetrics. + */ + public ContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics) { + this(context, dispatcher, metrics, context.getConf().getInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT)); + } + + @VisibleForTesting + public ContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics, int qLength) { + super(ContainerScheduler.class.getName()); + this.context = context; + this.dispatcher = dispatcher; + this.metrics = metrics; + this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; + this.utilizationTracker = + new AllocationBasedResourceUtilizationTracker(this); + this.opportunisticContainersStatus = + OpportunisticContainersStatus.newInstance(); + } + + /** + * Handle ContainerSchedulerEvents. + * @param event ContainerSchedulerEvent. + */ + @Override + public void handle(ContainerSchedulerEvent event) { + switch (event.getType()) { + case SCHEDULE_CONTAINER: + scheduleContainer(event.getContainer()); + break; + case CONTAINER_COMPLETED: + onContainerCompleted(event.getContainer()); + break; + case SHED_QUEUED_CONTAINERS: + shedQueuedOpportunisticContainers(); + break; + default: + LOG.error("Unknown event arrived at ContainerScheduler: " + + event.toString()); + } + } + + /** + * Return number of queued containers. + * @return Number of queued containers. + */ + public int getNumQueuedContainers() { + return this.queuedGuaranteedContainers.size() + + this.queuedOpportunisticContainers.size(); + } + + @VisibleForTesting + public int getNumQueuedGuaranteedContainers() { + return this.queuedGuaranteedContainers.size(); + } + + @VisibleForTesting + public int getNumQueuedOpportunisticContainers() { + return this.queuedOpportunisticContainers.size(); + } + + public OpportunisticContainersStatus getOpportunisticContainersStatus() { + this.opportunisticContainersStatus.setQueuedOpportContainers( + getNumQueuedOpportunisticContainers()); + this.opportunisticContainersStatus.setWaitQueueLength( + getNumQueuedContainers()); + this.opportunisticContainersStatus.setOpportMemoryUsed( + metrics.getOpportMemoryUsed()); + this.opportunisticContainersStatus.setOpportCoresUsed( + metrics.getOpportCoresUsed()); + this.opportunisticContainersStatus.setRunningOpportContainers( + metrics.getRunningOpportContainers()); + return this.opportunisticContainersStatus; + } + + private void onContainerCompleted(Container container) { + oppContainersToKill.remove(container.getContainerId()); + + // This could be killed externally for eg. by the ContainerManager, + // in which case, the container might still be queued. + Container queued = + queuedOpportunisticContainers.remove(container.getContainerId()); + if (queued == null) { + queuedGuaranteedContainers.remove(container.getContainerId()); + } + + // decrement only if it was a running container + Container completedContainer = runningContainers.remove(container + .getContainerId()); + if (completedContainer != null) { + this.utilizationTracker.subtractContainerResource(container); + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + this.metrics.opportunisticContainerCompleted(container); + } + startPendingContainers(); + } + } + + private void startPendingContainers() { + // Start pending guaranteed containers, if resources available. + boolean resourcesAvailable = + startContainersFromQueue(queuedGuaranteedContainers.values()); + // Start opportunistic containers, if resources available. + if (resourcesAvailable) { + startContainersFromQueue(queuedOpportunisticContainers.values()); + } + } + + private boolean startContainersFromQueue( + Collection<Container> queuedContainers) { + Iterator<Container> cIter = queuedContainers.iterator(); + boolean resourcesAvailable = true; + while (cIter.hasNext() && resourcesAvailable) { + Container container = cIter.next(); + if (this.utilizationTracker.hasResourcesAvailable(container)) { + startAllocatedContainer(container); + cIter.remove(); + } else { + resourcesAvailable = false; + } + } + return resourcesAvailable; + } + + @VisibleForTesting + protected void scheduleContainer(Container container) { + if (maxOppQueueLength <= 0) { + startAllocatedContainer(container); + return; + } + if (queuedGuaranteedContainers.isEmpty() && + queuedOpportunisticContainers.isEmpty() && + this.utilizationTracker.hasResourcesAvailable(container)) { + startAllocatedContainer(container); + } else { + LOG.info("No available resources for container {} to start its execution " + + "immediately.", container.getContainerId()); + boolean isQueued = true; + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.GUARANTEED) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + // Kill running opportunistic containers to make space for + // guaranteed container. + killOpportunisticContainers(container); + } else { + if (queuedOpportunisticContainers.size() <= maxOppQueueLength) { + LOG.info("Opportunistic container {} will be queued at the NM.", + container.getContainerId()); + queuedOpportunisticContainers.put( + container.getContainerId(), container); + } else { + isQueued = false; + LOG.info("Opportunistic container [{}] will not be queued at the NM" + + "since max queue length [{}] has been reached", + container.getContainerId(), maxOppQueueLength); + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Opportunistic container queue is full."); + } + } +// if (isQueued) { +// try { +// this.context.getNMStateStore().storeContainerQueued( +// container.getContainerId()); +// } catch (IOException e) { +// LOG.warn("Could not store container [" + container.getContainerId() +// + "] state. The Container has been queued.", e); +// } +// } + } + } + + private void killOpportunisticContainers(Container container) { + List<Container> extraOpportContainersToKill = + pickOpportunisticContainersToKill(container.getContainerId()); + // Kill the opportunistic containers that were chosen. + for (Container contToKill : extraOpportContainersToKill) { + contToKill.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Container Killed to make room for Guaranteed Container."); + oppContainersToKill.put(contToKill.getContainerId(), contToKill); + LOG.info( + "Opportunistic container {} will be killed in order to start the " + + "execution of guaranteed container {}.", + contToKill.getContainerId(), container.getContainerId()); + } + } + + private void startAllocatedContainer(Container container) { + LOG.info("Starting container [" + container.getContainerId()+ "]"); + runningContainers.put(container.getContainerId(), container); + this.utilizationTracker.addContainerResources(container); + if (container.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + this.metrics.opportunisticContainerStarted(container); + } + container.sendLaunchEvent(); + } + + private List<Container> pickOpportunisticContainersToKill( + ContainerId containerToStartId) { + // The opportunistic containers that need to be killed for the + // given container to start. + List<Container> extraOpportContainersToKill = new ArrayList<>(); + // Track resources that need to be freed. + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( + containerToStartId); + + // Go over the running opportunistic containers. + // Use a descending iterator to kill more recently started containers. + Iterator<Container> lifoIterator = new LinkedList<>( + runningContainers.values()).descendingIterator(); + while(lifoIterator.hasNext() && + !hasSufficientResources(resourcesToFreeUp)) { + Container runningCont = lifoIterator.next(); + if (runningCont.getContainerTokenIdentifier().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + + if (oppContainersToKill.containsKey( + runningCont.getContainerId())) { + // These containers have already been marked to be killed. + // So exclude them.. + continue; + } + extraOpportContainersToKill.add(runningCont); + ContainersMonitor.ContainerManagerUtils.decreaseResourceUtilization( + getContainersMonitor(), resourcesToFreeUp, + runningCont.getResource()); + } + } + if (!hasSufficientResources(resourcesToFreeUp)) { + LOG.warn("There are no sufficient resources to start guaranteed [{}]" + + "at the moment. Opportunistic containers are in the process of" + + "being killed to make room.", containerToStartId); + } + return extraOpportContainersToKill; + } + + private boolean hasSufficientResources( + ResourceUtilization resourcesToFreeUp) { + return resourcesToFreeUp.getPhysicalMemory() <= 0 && + resourcesToFreeUp.getVirtualMemory() <= 0 && + resourcesToFreeUp.getCPU() <= 0.0f; + } + + private ResourceUtilization resourcesToFreeUp( + ContainerId containerToStartId) { + // Get allocation of currently allocated containers. + ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization + .newInstance(this.utilizationTracker.getCurrentUtilization()); + + // Add to the allocation the allocation of the pending guaranteed + // containers that will start before the current container will be started. + for (Container container : queuedGuaranteedContainers.values()) { + ContainersMonitor.ContainerManagerUtils.increaseResourceUtilization( + getContainersMonitor(), resourceAllocationToFreeUp, + container.getResource()); + if (container.getContainerId().equals(containerToStartId)) { + break; + } + } + + // These resources are being freed, likely at the behest of another + // guaranteed container.. + for (Container container : oppContainersToKill.values()) { + ContainersMonitor.ContainerManagerUtils.decreaseResourceUtilization( + getContainersMonitor(), resourceAllocationToFreeUp, + container.getResource()); + } + + // Subtract the overall node resources. + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + resourceAllocationToFreeUp); + return resourceAllocationToFreeUp; + } + + @SuppressWarnings("unchecked") + public void updateQueuingLimit(ContainerQueuingLimit limit) { + this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength()); + // YARN-2886 should add support for wait-times. Include wait time as + // well once it is implemented + if ((queuingLimit.getMaxQueueLength() > -1) && + (queuingLimit.getMaxQueueLength() < + queuedOpportunisticContainers.size())) { + dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.SHED_QUEUED_CONTAINERS)); + } + } + + private void shedQueuedOpportunisticContainers() { + int numAllowed = this.queuingLimit.getMaxQueueLength(); + Iterator<Container> containerIter = + queuedOpportunisticContainers.values().iterator(); + while (containerIter.hasNext()) { + Container container = containerIter.next(); + if (numAllowed <= 0) { + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Container De-queued to meet NM queuing limits."); + containerIter.remove(); + LOG.info( + "Opportunistic container {} will be killed to meet NM queuing" + + " limits.", container.getContainerId()); + } + numAllowed--; + } + } + + public ContainersMonitor getContainersMonitor() { + return this.context.getContainerManager().getContainersMonitor(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java new file mode 100644 index 0000000..460aaeb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .Container; + +/** + * Events consumed by the {@link ContainerScheduler}. + */ +public class ContainerSchedulerEvent extends + AbstractEvent<ContainerSchedulerEventType> { + + private final Container container; + + /** + * Create instance of Event. + * @param container Container. + * @param eventType EventType. + */ + public ContainerSchedulerEvent(Container container, + ContainerSchedulerEventType eventType) { + super(eventType); + this.container = container; + } + + /** + * Get the container associated with the event. + * @return Container. + */ + public Container getContainer() { + return container; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java new file mode 100644 index 0000000..086cb9b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +/** + * Event types associated with {@link ContainerSchedulerEvent}. + */ +public enum ContainerSchedulerEventType { + SCHEDULE_CONTAINER, + CONTAINER_COMPLETED, + // Producer: Node HB response - RM has asked to shed the queue + SHED_QUEUED_CONTAINERS, +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java new file mode 100644 index 0000000..3c17eca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +/** + * This interface abstracts out how a container contributes to + * Resource Utilization of the node. + * It is used by the {@link ContainerScheduler} to determine which + * OPPORTUNISTIC containers to be killed to make room for a GUARANTEED + * container. + */ +public interface ResourceUtilizationTracker { + + /** + * Get the current total utilization of all the Containers running on + * the node. + * @return ResourceUtilization Resource Utilization. + */ + ResourceUtilization getCurrentUtilization(); + + /** + * Add Container's resources to Node Utilization. + * @param container Container. + */ + void addContainerResources(Container container); + + /** + * Subtract Container's resources to Node Utilization. + * @param container Container. + */ + void subtractContainerResource(Container container); + + /** + * Check if NM has resources available currently to run the container. + * @param container Container. + * @return True, if NM has resources available currently to run the container. + */ + boolean hasResourcesAvailable(Container container); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java new file mode 100644 index 0000000..4641ac0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Container Scheduler + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java index 6105eff..b001b63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java @@ -23,11 +23,14 @@ import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.yarn.api.records.Resource; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .Container; @Metrics(about="Metrics for node manager", context="yarn") public class NodeManagerMetrics { @@ -60,6 +63,14 @@ public class NodeManagerMetrics { MutableGaugeInt goodLocalDirsDiskUtilizationPerc; @Metric("Disk utilization % on good log dirs") MutableGaugeInt goodLogDirsDiskUtilizationPerc; + + @Metric("Memory used by Opportunistic Containers in MB") + MutableGaugeLong opportMemoryUsed; + @Metric("# of Virtual Cores used by opportunistic containers") + MutableGaugeInt opportCoresUsed; + @Metric("# of running opportunistic containers") + MutableGaugeInt runningOpportContainers; + // CHECKSTYLE:ON:VisibilityModifier private JvmMetrics jvmMetrics = null; @@ -130,6 +141,30 @@ public class NodeManagerMetrics { containersReIniting.decr(); } + public long getOpportMemoryUsed() { + return opportMemoryUsed.value(); + } + + public int getOpportCoresUsed() { + return opportCoresUsed.value(); + } + + public int getRunningOpportContainers() { + return runningOpportContainers.value(); + } + + public void opportunisticContainerCompleted(Container container) { + opportMemoryUsed.decr(container.getResource().getMemorySize()); + opportCoresUsed.decr(container.getResource().getVirtualCores()); + runningOpportContainers.decr(); + } + + public void opportunisticContainerStarted(Container container) { + opportMemoryUsed.incr(container.getResource().getMemorySize()); + opportCoresUsed.incr(container.getResource().getVirtualCores()); + runningOpportContainers.incr(); + } + public void allocateContainer(Resource res) { allocatedContainers.incr(); allocatedMB = allocatedMB + res.getMemorySize(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java index 35e7593..e1a9995 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; @@ -38,7 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; -import org.apache.hadoop.yarn.util.ConverterUtils; + import org.apache.hadoop.yarn.webapp.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +148,7 @@ public class ContainerLogsUtils { private static void checkState(ContainerState state) { if (state == ContainerState.NEW || state == ContainerState.LOCALIZING || - state == ContainerState.LOCALIZED) { + state == ContainerState.SCHEDULED) { throw new NotFoundException("Container is not yet running. Current state is " + state); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 3b84a78..8e4522b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.FileContext; @@ -158,7 +159,7 @@ public class TestEventFlow { containerManager.startContainers(allRequests); BaseContainerManagerTest.waitForContainerState(containerManager, cID, - ContainerState.RUNNING); + Arrays.asList(ContainerState.RUNNING, ContainerState.SCHEDULED), 20); List<ContainerId> containerIds = new ArrayList<ContainerId>(); containerIds.add(cID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index f6593f9..04cfae9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -454,6 +454,14 @@ public class TestNodeManagerResync { if (containersShouldBePreserved) { Assert.assertFalse(containers.isEmpty()); Assert.assertTrue(containers.containsKey(existingCid)); + ContainerState state = containers.get(existingCid) + .cloneAndGetContainerStatus().getState(); + // Wait till RUNNING state... + int counter = 50; + while (state != ContainerState.RUNNING && counter > 0) { + Thread.sleep(100); + counter--; + } Assert.assertEquals(ContainerState.RUNNING, containers.get(existingCid) .cloneAndGetContainerStatus().getState()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index b3ad318..03e06d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -255,7 +256,9 @@ public class TestNodeManagerShutdown { GetContainerStatusesRequest.newInstance(containerIds); ContainerStatus containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0); - Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); + Assert.assertTrue( + EnumSet.of(ContainerState.RUNNING, ContainerState.SCHEDULED) + .contains(containerStatus.getState())); } public static ContainerId createContainerId() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index d76aa35..c679b92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -65,7 +65,6 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -1080,128 +1079,6 @@ public class TestNodeStatusUpdater { Assert.assertTrue(containerIdSet.contains(runningContainerId)); } - @Test(timeout = 90000) - public void testKilledQueuedContainers() throws Exception { - NodeManager nm = new NodeManager(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set( - NodeStatusUpdaterImpl - .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, - "10000"); - nm.init(conf); - NodeStatusUpdaterImpl nodeStatusUpdater = - (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 0); - - // Add application to context. - nm.getNMContext().getApplications().putIfAbsent(appId, - mock(Application.class)); - - // Create a running container and add it to the context. - ContainerId runningContainerId = - ContainerId.newContainerId(appAttemptId, 1); - Token runningContainerToken = - BuilderUtils.newContainerToken(runningContainerId, 0, "anyHost", - 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, - "password".getBytes(), 0); - Container runningContainer = - new ContainerImpl(conf, null, null, null, null, - BuilderUtils.newContainerTokenIdentifier(runningContainerToken), - nm.getNMContext()) { - @Override - public ContainerState getCurrentState() { - return ContainerState.RUNNING; - } - - @Override - public org.apache.hadoop.yarn.server.nodemanager.containermanager. - container.ContainerState getContainerState() { - return org.apache.hadoop.yarn.server.nodemanager.containermanager. - container.ContainerState.RUNNING; - } - }; - - nm.getNMContext().getContainers() - .put(runningContainerId, runningContainer); - - // Create two killed queued containers and add them to the queuing context. - ContainerId killedQueuedContainerId1 = ContainerId.newContainerId( - appAttemptId, 2); - ContainerTokenIdentifier killedQueuedContainerTokenId1 = BuilderUtils - .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - killedQueuedContainerId1, 0, "anyHost", 1234, "anyUser", - BuilderUtils.newResource(1024, 1), 0, 123, - "password".getBytes(), 0)); - ContainerId killedQueuedContainerId2 = ContainerId.newContainerId( - appAttemptId, 3); - ContainerTokenIdentifier killedQueuedContainerTokenId2 = BuilderUtils - .newContainerTokenIdentifier(BuilderUtils.newContainerToken( - killedQueuedContainerId2, 0, "anyHost", 1234, "anyUser", - BuilderUtils.newResource(1024, 1), 0, 123, - "password".getBytes(), 0)); - - nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put( - killedQueuedContainerTokenId1, "Queued container killed."); - nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put( - killedQueuedContainerTokenId2, "Queued container killed."); - - List<ContainerStatus> containerStatuses = nodeStatusUpdater - .getContainerStatuses(); - - Assert.assertEquals(3, containerStatuses.size()); - - ContainerStatus runningContainerStatus = null; - ContainerStatus killedQueuedContainerStatus1 = null; - ContainerStatus killedQueuedContainerStatus2 = null; - for (ContainerStatus cStatus : containerStatuses) { - if (ContainerState.RUNNING == cStatus.getState()) { - runningContainerStatus = cStatus; - } - if (ContainerState.COMPLETE == cStatus.getState()) { - if (killedQueuedContainerId1.equals(cStatus.getContainerId())) { - killedQueuedContainerStatus1 = cStatus; - } else { - killedQueuedContainerStatus2 = cStatus; - } - } - } - - // Check container IDs and Container Status. - Assert.assertNotNull(runningContainerId); - Assert.assertNotNull(killedQueuedContainerId1); - Assert.assertNotNull(killedQueuedContainerId2); - - // Killed queued container should have ABORTED exit status. - Assert.assertEquals(ContainerExitStatus.ABORTED, - killedQueuedContainerStatus1.getExitStatus()); - Assert.assertEquals(ContainerExitStatus.ABORTED, - killedQueuedContainerStatus2.getExitStatus()); - - // Killed queued container should appear in the recentlyStoppedContainers. - Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped( - killedQueuedContainerId1)); - Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped( - killedQueuedContainerId2)); - - // Check if killed queued containers are successfully removed from the - // queuing context. - List<ContainerId> ackedContainers = new ArrayList<ContainerId>(); - ackedContainers.add(killedQueuedContainerId1); - ackedContainers.add(killedQueuedContainerId2); - - nodeStatusUpdater.removeOrTrackCompletedContainersFromContext( - ackedContainers); - - containerStatuses = nodeStatusUpdater.getContainerStatuses(); - - // Only the running container should be in the container statuses now. - Assert.assertEquals(1, containerStatuses.size()); - Assert.assertEquals(ContainerState.RUNNING, - containerStatuses.get(0).getState()); - } - @Test(timeout = 10000) public void testCompletedContainersIsRecentlyStopped() throws Exception { NodeManager nm = new NodeManager(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 579bea9..4ec5069 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -691,11 +692,6 @@ public abstract class BaseAMRMProxyTest { return null; } - @Override - public QueuingContext getQueuingContext() { - return null; - } - public boolean isDistributedSchedulingEnabled() { return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/368565f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 609d549..e009661 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -192,10 +193,10 @@ public abstract class BaseContainerManagerTest { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); // Default delSrvc + exec = createContainerExecutor(); delSrvc = createDeletionService(); delSrvc.init(conf); - exec = createContainerExecutor(); dirsHandler = new LocalDirsHandlerService(); nodeHealthChecker = new NodeHealthCheckerService( NodeManager.getNodeHealthScriptRunner(conf), dirsHandler); @@ -288,32 +289,43 @@ public abstract class BaseContainerManagerTest { ContainerManagementProtocol containerManager, ContainerId containerID, ContainerState finalState) throws InterruptedException, YarnException, IOException { - waitForContainerState(containerManager, containerID, finalState, 20); + waitForContainerState(containerManager, containerID, + Arrays.asList(finalState), 20); } public static void waitForContainerState( ContainerManagementProtocol containerManager, ContainerId containerID, ContainerState finalState, int timeOutMax) throws InterruptedException, YarnException, IOException { + waitForContainerState(containerManager, containerID, + Arrays.asList(finalState), timeOutMax); + } + + public static void waitForContainerState( + ContainerManagementProtocol containerManager, ContainerId containerID, + List<ContainerState> finalStates, int timeOutMax) + throws InterruptedException, YarnException, IOException { List<ContainerId> list = new ArrayList<ContainerId>(); list.add(containerID); GetContainerStatusesRequest request = GetContainerStatusesRequest.newInstance(list); ContainerStatus containerStatus = null; + HashSet<ContainerState> fStates = + new HashSet<>(finalStates); int timeoutSecs = 0; do { Thread.sleep(2000); containerStatus = containerManager.getContainerStatuses(request) .getContainerStatuses().get(0); - LOG.info("Waiting for container to get into state " + finalState + LOG.info("Waiting for container to get into one of states " + fStates + ". Current state is " + containerStatus.getState()); timeoutSecs += 2; - } while (!containerStatus.getState().equals(finalState) + } while (!fStates.contains(containerStatus.getState()) && timeoutSecs < timeOutMax); LOG.info("Container state is " + containerStatus.getState()); - Assert.assertEquals("ContainerState is not correct (timedout)", - finalState, containerStatus.getState()); + Assert.assertTrue("ContainerState is not correct (timedout)", + fStates.contains(containerStatus.getState())); } public static void waitForApplicationState( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org