[
https://issues.apache.org/jira/browse/YARN-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657956#comment-15657956
]
ASF GitHub Bot commented on YARN-4597:
--------------------------------------
Github user kambatla commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/143#discussion_r87640967
--- Diff:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
---
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import
org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The ContainerScheduler manages a collection of runnable containers. It
+ * ensures that a container is launched only if all it launch criteria are
+ * met. It also ensures that OPPORTUNISTIC containers are killed to make
+ * room for GUARANTEED containers.
+ */
+public class ContainerScheduler extends AbstractService implements
+ EventHandler<ContainerSchedulerEvent> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerScheduler.class);
+
+ private final Context context;
+ private final int maxOppQueueLength;
+
+ // Queue of Guaranteed Containers waiting for resources to run
+ private final LinkedHashMap<ContainerId, Container>
+ queuedGuaranteedContainers = new LinkedHashMap<>();
+ // Queue of Opportunistic Containers waiting for resources to run
+ private final LinkedHashMap<ContainerId, Container>
+ queuedOpportunisticContainers = new LinkedHashMap<>();
+
+ // Used to keep track of containers that have been marked to be killed
+ // to make room for a guaranteed container.
+ private final Map<ContainerId, Container> oppContainersMarkedForKill =
+ new HashMap<>();
+
+ // Containers launched by the Scheduler will take a while to actually
+ // move to the RUNNING state, but should still be fair game for killing
+ // by the scheduler to make room for guaranteed containers.
+ private final LinkedHashMap<ContainerId, Container>
scheduledToRunContainers =
+ new LinkedHashMap<>();
+
+ private final ContainerQueuingLimit queuingLimit =
+ ContainerQueuingLimit.newInstance();
+
+ private final OpportunisticContainersStatus
opportunisticContainersStatus;
+
+ // Resource Utilization Manager that decides how utilization of the
cluster
+ // increase / decreases based on container start / finish
+ private ResourceUtilizationManager utilizationManager;
+
+ /**
+ * Instantiate a Container Scheduler.
+ * @param context NodeManager Context.
+ */
+ public ContainerScheduler(Context context) {
+ this(context, context.getConf().getInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
+ }
+
+ @VisibleForTesting
+ public ContainerScheduler(Context context, int qLength) {
+ super(ContainerScheduler.class.getName());
+ this.context = context;
+ this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+ this.utilizationManager = new ResourceUtilizationManager(this);
+ this.opportunisticContainersStatus =
+ OpportunisticContainersStatus.newInstance();
+ }
+
+ /**
+ * Handle ContainerSchedulerEvents.
+ * @param event ContainerSchedulerEvent.
+ */
+ @Override
+ public void handle(ContainerSchedulerEvent event) {
+ switch (event.getType()) {
+ case SCHEDULE_CONTAINER:
+ scheduleContainer(event.getContainer());
+ break;
+ case CONTAINER_COMPLETED:
+ onContainerCompleted(event.getContainer());
+ break;
+ default:
+ LOG.error("Unknown event arrived at ContainerScheduler: "
+ + event.toString());
+ }
+ }
+
+ /**
+ * Return number of queued containers.
+ * @return Number of queued containers.
+ */
+ public int getNumQueuedContainers() {
+ return this.queuedGuaranteedContainers.size()
+ + this.queuedOpportunisticContainers.size();
+ }
+
+ @VisibleForTesting
+ public int getNumQueuedGuaranteedContainers() {
+ return this.queuedGuaranteedContainers.size();
+ }
+
+ @VisibleForTesting
+ public int getNumQueuedOpportunisticContainers() {
+ return this.queuedOpportunisticContainers.size();
+ }
+
+ public OpportunisticContainersStatus getOpportunisticContainersStatus() {
+ this.opportunisticContainersStatus.setQueuedOpportContainers(
+ getNumQueuedOpportunisticContainers());
+ this.opportunisticContainersStatus.setWaitQueueLength(
+ getNumQueuedContainers());
+ return this.opportunisticContainersStatus;
+ }
+
+ private void onContainerCompleted(Container container) {
+ // decrement only if it was a running container
+ if (scheduledToRunContainers.containsKey(container.getContainerId())) {
+ this.utilizationManager.subtractContainerResource(container);
+ if (container.getContainerTokenIdentifier().getExecutionType() ==
+ ExecutionType.OPPORTUNISTIC) {
+ this.opportunisticContainersStatus.setOpportMemoryUsed(
+ this.opportunisticContainersStatus.getOpportMemoryUsed()
+ - container.getResource().getMemorySize());
+ this.opportunisticContainersStatus.setOpportCoresUsed(
+ this.opportunisticContainersStatus.getOpportCoresUsed()
+ - container.getResource().getVirtualCores());
+ this.opportunisticContainersStatus.setRunningOpportContainers(
+ this.opportunisticContainersStatus.getRunningOpportContainers()
+ - 1);
+ }
+ }
+ scheduledToRunContainers.remove(container.getContainerId());
+ oppContainersMarkedForKill.remove(container.getContainerId());
+ startPendingContainers();
+ }
+
+ private void startPendingContainers() {
+ // Start pending guaranteed containers, if resources available.
+ boolean resourcesAvailable =
+ startContainersFromQueue(queuedGuaranteedContainers.values());
+ // Start opportunistic containers, if resources available.
+ if (resourcesAvailable) {
+ startContainersFromQueue(queuedOpportunisticContainers.values());
+ }
+ }
+
+ private boolean startContainersFromQueue(
+ Collection<Container> queuedContainers) {
+ Iterator<Container> cIter = queuedContainers.iterator();
+ boolean resourcesAvailable = true;
+ while (cIter.hasNext() && resourcesAvailable) {
+ Container container = cIter.next();
+ if (this.utilizationManager.hasResourcesAvailable(container)) {
+ startAllocatedContainer(container);
+ cIter.remove();
+ } else {
+ resourcesAvailable = false;
+ }
+ }
+ return resourcesAvailable;
+ }
+
+ @VisibleForTesting
+ protected void scheduleContainer(Container container) {
+ if (maxOppQueueLength <= 0) {
+ startAllocatedContainer(container);
+ return;
+ }
+ if (queuedGuaranteedContainers.isEmpty() &&
+ queuedOpportunisticContainers.isEmpty() &&
+ this.utilizationManager.hasResourcesAvailable(container)) {
+ startAllocatedContainer(container);
+ } else {
+ LOG.info("No available resources for container {} to start its
execution "
+ + "immediately.", container.getContainerId());
+ boolean isQueued = true;
+ if (container.getContainerTokenIdentifier().getExecutionType() ==
+ ExecutionType.GUARANTEED) {
+ queuedGuaranteedContainers.put(container.getContainerId(),
container);
+ // Kill running opportunistic containers to make space for
+ // guaranteed container.
+ killOpportunisticContainers(container);
+ } else {
+ if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
+ LOG.info("Opportunistic container {} will be queued at the NM.",
+ container.getContainerId());
+ queuedOpportunisticContainers.put(
+ container.getContainerId(), container);
+ } else {
+ isQueued = false;
+ LOG.info("Opportunistic container [{}] will not be queued at the
NM" +
+ "since max queue length [{}] has been reached",
+ container.getContainerId(), maxOppQueueLength);
+ container.sendKillEvent(
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+ "Opportunistic container queue is full.");
+ }
+ }
+ if (isQueued) {
+ try {
+ this.context.getNMStateStore().storeContainerQueued(
+ container.getContainerId());
+ } catch (IOException e) {
+ LOG.warn("Could not store container state into store..", e);
+ }
+ }
+ }
+ }
+
+ private void killOpportunisticContainers(Container container) {
+ List<Container> extraOpportContainersToKill =
+ pickOpportunisticContainersToKill(container.getContainerId());
+ // Kill the opportunistic containers that were chosen.
+ for (Container contToKill : extraOpportContainersToKill) {
+ contToKill.sendKillEvent(
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+ "Container Killed to make room for Guaranteed Container.");
+ oppContainersMarkedForKill.put(contToKill.getContainerId(),
contToKill);
+ LOG.info(
+ "Opportunistic container {} will be killed in order to start the
"
+ + "execution of guaranteed container {}.",
+ contToKill.getContainerId(), container.getContainerId());
+ }
+ }
+
+ private void startAllocatedContainer(Container container) {
+ LOG.info("Starting container [" + container.getContainerId()+ "]");
+ scheduledToRunContainers.put(container.getContainerId(), container);
+ this.utilizationManager.addContainerResources(container);
+ if (container.getContainerTokenIdentifier().getExecutionType() ==
+ ExecutionType.OPPORTUNISTIC) {
+ this.opportunisticContainersStatus.setOpportMemoryUsed(
+ this.opportunisticContainersStatus.getOpportMemoryUsed()
+ + container.getResource().getMemorySize());
+ this.opportunisticContainersStatus.setOpportCoresUsed(
+ this.opportunisticContainersStatus.getOpportCoresUsed()
+ + container.getResource().getVirtualCores());
+ this.opportunisticContainersStatus.setRunningOpportContainers(
+ this.opportunisticContainersStatus.getRunningOpportContainers()
+ + 1);
+ }
+ container.sendLaunchEvent();
+ }
+
+ private List<Container> pickOpportunisticContainersToKill(
+ ContainerId containerToStartId) {
+ // The additional opportunistic containers that need to be killed for
the
+ // given container to start.
+ List<Container> extraOpportContainersToKill = new ArrayList<>();
+ // Track resources that need to be freed.
+ ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+ containerToStartId);
+
+ // Go over the running opportunistic containers.
+ // Use a descending iterator to kill more recently started containers.
+ Iterator<Container> reverseContainerIterator =
+ new
LinkedList<>(scheduledToRunContainers.values()).descendingIterator();
+ while(reverseContainerIterator.hasNext() &&
+ !hasSufficientResources(resourcesToFreeUp)) {
+ Container runningCont = reverseContainerIterator.next();
+ if (runningCont.getContainerTokenIdentifier().getExecutionType() ==
+ ExecutionType.OPPORTUNISTIC) {
+
+ if (oppContainersMarkedForKill.containsKey(
--- End diff --
How about removing all containers that are already marked before we get to
this..
If runningOppContainers were a stack and oppContainersToKill were a queue,
you could just do
`
for (Container container :
runningOppContainers.removeAll(oppContainersToKill)) {
killContainer(container);
if (hasSufficientResources()) {
break;
}
}`
> Add SCHEDULE to NM container lifecycle
> --------------------------------------
>
> Key: YARN-4597
> URL: https://issues.apache.org/jira/browse/YARN-4597
> Project: Hadoop YARN
> Issue Type: New Feature
> Components: nodemanager
> Reporter: Chris Douglas
> Assignee: Arun Suresh
> Labels: oct16-hard
> Attachments: YARN-4597.001.patch, YARN-4597.002.patch,
> YARN-4597.003.patch, YARN-4597.004.patch, YARN-4597.005.patch,
> YARN-4597.006.patch, YARN-4597.007.patch, YARN-4597.008.patch,
> YARN-4597.009.patch
>
>
> Currently, the NM immediately launches containers after resource
> localization. Several features could be more cleanly implemented if the NM
> included a separate stage for reserving resources.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]