[ 
https://issues.apache.org/jira/browse/YARN-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657938#comment-15657938
 ] 

ASF GitHub Bot commented on YARN-4597:
--------------------------------------

Github user kambatla commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/143#discussion_r87639757
  
    --- Diff: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
 ---
    @@ -0,0 +1,393 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.hadoop.service.AbstractService;
    +import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.hadoop.yarn.api.records.ExecutionType;
    +import org.apache.hadoop.yarn.api.records.ResourceUtilization;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.event.EventHandler;
    +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
    +import 
org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
    +import org.apache.hadoop.yarn.server.nodemanager.Context;
    +import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
    +import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * The ContainerScheduler manages a collection of runnable containers. It
    + * ensures that a container is launched only if all it launch criteria are
    + * met. It also ensures that OPPORTUNISTIC containers are killed to make
    + * room for GUARANTEED containers.
    + */
    +public class ContainerScheduler extends AbstractService implements
    +    EventHandler<ContainerSchedulerEvent> {
    +
    +  private static final Logger LOG =
    +      LoggerFactory.getLogger(ContainerScheduler.class);
    +
    +  private final Context context;
    +  private final int maxOppQueueLength;
    +
    +  // Queue of Guaranteed Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedGuaranteedContainers = new LinkedHashMap<>();
    +  // Queue of Opportunistic Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedOpportunisticContainers = new LinkedHashMap<>();
    +
    +  // Used to keep track of containers that have been marked to be killed
    +  // to make room for a guaranteed container.
    +  private final Map<ContainerId, Container> oppContainersMarkedForKill =
    +      new HashMap<>();
    +
    +  // Containers launched by the Scheduler will take a while to actually
    +  // move to the RUNNING state, but should still be fair game for killing
    +  // by the scheduler to make room for guaranteed containers.
    +  private final LinkedHashMap<ContainerId, Container> 
scheduledToRunContainers =
    +      new LinkedHashMap<>();
    +
    +  private final ContainerQueuingLimit queuingLimit =
    +      ContainerQueuingLimit.newInstance();
    +
    +  private final OpportunisticContainersStatus 
opportunisticContainersStatus;
    +
    +  // Resource Utilization Manager that decides how utilization of the 
cluster
    +  // increase / decreases based on container start / finish
    +  private ResourceUtilizationManager utilizationManager;
    +
    +  /**
    +   * Instantiate a Container Scheduler.
    +   * @param context NodeManager Context.
    +   */
    +  public ContainerScheduler(Context context) {
    +    this(context, context.getConf().getInt(
    +        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
    +        
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
    +  }
    +
    +  @VisibleForTesting
    +  public ContainerScheduler(Context context, int qLength) {
    +    super(ContainerScheduler.class.getName());
    +    this.context = context;
    +    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
    +    this.utilizationManager = new ResourceUtilizationManager(this);
    +    this.opportunisticContainersStatus =
    +        OpportunisticContainersStatus.newInstance();
    +  }
    +
    +  /**
    +   * Handle ContainerSchedulerEvents.
    +   * @param event ContainerSchedulerEvent.
    +   */
    +  @Override
    +  public void handle(ContainerSchedulerEvent event) {
    +    switch (event.getType()) {
    +    case SCHEDULE_CONTAINER:
    +      scheduleContainer(event.getContainer());
    +      break;
    +    case CONTAINER_COMPLETED:
    +      onContainerCompleted(event.getContainer());
    +      break;
    +    default:
    +      LOG.error("Unknown event arrived at ContainerScheduler: "
    +          + event.toString());
    +    }
    +  }
    +
    +  /**
    +   * Return number of queued containers.
    +   * @return Number of queued containers.
    +   */
    +  public int getNumQueuedContainers() {
    +    return this.queuedGuaranteedContainers.size()
    +        + this.queuedOpportunisticContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedGuaranteedContainers() {
    +    return this.queuedGuaranteedContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedOpportunisticContainers() {
    +    return this.queuedOpportunisticContainers.size();
    +  }
    +
    +  public OpportunisticContainersStatus getOpportunisticContainersStatus() {
    +    this.opportunisticContainersStatus.setQueuedOpportContainers(
    +        getNumQueuedOpportunisticContainers());
    +    this.opportunisticContainersStatus.setWaitQueueLength(
    +        getNumQueuedContainers());
    +    return this.opportunisticContainersStatus;
    +  }
    +
    +  private void onContainerCompleted(Container container) {
    +    // decrement only if it was a running container
    +    if (scheduledToRunContainers.containsKey(container.getContainerId())) {
    +      this.utilizationManager.subtractContainerResource(container);
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.OPPORTUNISTIC) {
    +        this.opportunisticContainersStatus.setOpportMemoryUsed(
    +            this.opportunisticContainersStatus.getOpportMemoryUsed()
    +                - container.getResource().getMemorySize());
    +        this.opportunisticContainersStatus.setOpportCoresUsed(
    +            this.opportunisticContainersStatus.getOpportCoresUsed()
    +                - container.getResource().getVirtualCores());
    +        this.opportunisticContainersStatus.setRunningOpportContainers(
    +            this.opportunisticContainersStatus.getRunningOpportContainers()
    +                - 1);
    +      }
    +    }
    +    scheduledToRunContainers.remove(container.getContainerId());
    +    oppContainersMarkedForKill.remove(container.getContainerId());
    +    startPendingContainers();
    +  }
    +
    +  private void startPendingContainers() {
    +    // Start pending guaranteed containers, if resources available.
    +    boolean resourcesAvailable =
    +        startContainersFromQueue(queuedGuaranteedContainers.values());
    +    // Start opportunistic containers, if resources available.
    +    if (resourcesAvailable) {
    +      startContainersFromQueue(queuedOpportunisticContainers.values());
    +    }
    +  }
    +
    +  private boolean startContainersFromQueue(
    +      Collection<Container> queuedContainers) {
    +    Iterator<Container> cIter = queuedContainers.iterator();
    +    boolean resourcesAvailable = true;
    +    while (cIter.hasNext() && resourcesAvailable) {
    +      Container container = cIter.next();
    +      if (this.utilizationManager.hasResourcesAvailable(container)) {
    +        startAllocatedContainer(container);
    +        cIter.remove();
    +      } else {
    +        resourcesAvailable = false;
    +      }
    +    }
    +    return resourcesAvailable;
    +  }
    +
    +  @VisibleForTesting
    +  protected void scheduleContainer(Container container) {
    +    if (maxOppQueueLength <= 0) {
    +      startAllocatedContainer(container);
    +      return;
    +    }
    +    if (queuedGuaranteedContainers.isEmpty() &&
    +        queuedOpportunisticContainers.isEmpty() &&
    +        this.utilizationManager.hasResourcesAvailable(container)) {
    +      startAllocatedContainer(container);
    +    } else {
    +      LOG.info("No available resources for container {} to start its 
execution "
    +          + "immediately.", container.getContainerId());
    +      boolean isQueued = true;
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.GUARANTEED) {
    +        queuedGuaranteedContainers.put(container.getContainerId(), 
container);
    +        // Kill running opportunistic containers to make space for
    +        // guaranteed container.
    +        killOpportunisticContainers(container);
    +      } else {
    +        if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
    +          LOG.info("Opportunistic container {} will be queued at the NM.",
    +              container.getContainerId());
    +          queuedOpportunisticContainers.put(
    +              container.getContainerId(), container);
    +        } else {
    +          isQueued = false;
    +          LOG.info("Opportunistic container [{}] will not be queued at the 
NM" +
    +              "since max queue length [{}] has been reached",
    +              container.getContainerId(), maxOppQueueLength);
    +          container.sendKillEvent(
    +              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +              "Opportunistic container queue is full.");
    +        }
    +      }
    +      if (isQueued) {
    +        try {
    +          this.context.getNMStateStore().storeContainerQueued(
    +              container.getContainerId());
    +        } catch (IOException e) {
    +          LOG.warn("Could not store container state into store..", e);
    +        }
    +      }
    +    }
    +  }
    +
    +  private void killOpportunisticContainers(Container container) {
    +    List<Container> extraOpportContainersToKill =
    +        pickOpportunisticContainersToKill(container.getContainerId());
    +    // Kill the opportunistic containers that were chosen.
    +    for (Container contToKill : extraOpportContainersToKill) {
    +      contToKill.sendKillEvent(
    +          ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +          "Container Killed to make room for Guaranteed Container.");
    +      oppContainersMarkedForKill.put(contToKill.getContainerId(), 
contToKill);
    +      LOG.info(
    +          "Opportunistic container {} will be killed in order to start the 
"
    +              + "execution of guaranteed container {}.",
    +          contToKill.getContainerId(), container.getContainerId());
    +    }
    +  }
    +
    +  private void startAllocatedContainer(Container container) {
    +    LOG.info("Starting container [" + container.getContainerId()+ "]");
    +    scheduledToRunContainers.put(container.getContainerId(), container);
    +    this.utilizationManager.addContainerResources(container);
    +    if (container.getContainerTokenIdentifier().getExecutionType() ==
    +        ExecutionType.OPPORTUNISTIC) {
    +      this.opportunisticContainersStatus.setOpportMemoryUsed(
    --- End diff --
    
    Method name has "used", while we actually refer to allocated. Should we 
name this differently? 
    
    Or, would it make more sense to add another boolean to 
OpportunisticContainersStatus to capture whether used refers to 
allocated/utilization? 


> Add SCHEDULE to NM container lifecycle
> --------------------------------------
>
>                 Key: YARN-4597
>                 URL: https://issues.apache.org/jira/browse/YARN-4597
>             Project: Hadoop YARN
>          Issue Type: New Feature
>          Components: nodemanager
>            Reporter: Chris Douglas
>            Assignee: Arun Suresh
>              Labels: oct16-hard
>         Attachments: YARN-4597.001.patch, YARN-4597.002.patch, 
> YARN-4597.003.patch, YARN-4597.004.patch, YARN-4597.005.patch, 
> YARN-4597.006.patch, YARN-4597.007.patch, YARN-4597.008.patch, 
> YARN-4597.009.patch
>
>
> Currently, the NM immediately launches containers after resource 
> localization. Several features could be more cleanly implemented if the NM 
> included a separate stage for reserving resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to