[ 
https://issues.apache.org/jira/browse/YARN-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658343#comment-15658343
 ] 

ASF GitHub Bot commented on YARN-4597:
--------------------------------------

Github user xslogic commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/143#discussion_r87668561
  
    --- Diff: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
 ---
    @@ -0,0 +1,393 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.hadoop.service.AbstractService;
    +import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.hadoop.yarn.api.records.ExecutionType;
    +import org.apache.hadoop.yarn.api.records.ResourceUtilization;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.event.EventHandler;
    +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
    +import 
org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
    +import org.apache.hadoop.yarn.server.nodemanager.Context;
    +import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
    +import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * The ContainerScheduler manages a collection of runnable containers. It
    + * ensures that a container is launched only if all it launch criteria are
    + * met. It also ensures that OPPORTUNISTIC containers are killed to make
    + * room for GUARANTEED containers.
    + */
    +public class ContainerScheduler extends AbstractService implements
    +    EventHandler<ContainerSchedulerEvent> {
    +
    +  private static final Logger LOG =
    +      LoggerFactory.getLogger(ContainerScheduler.class);
    +
    +  private final Context context;
    +  private final int maxOppQueueLength;
    +
    +  // Queue of Guaranteed Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedGuaranteedContainers = new LinkedHashMap<>();
    +  // Queue of Opportunistic Containers waiting for resources to run
    +  private final LinkedHashMap<ContainerId, Container>
    +      queuedOpportunisticContainers = new LinkedHashMap<>();
    +
    +  // Used to keep track of containers that have been marked to be killed
    +  // to make room for a guaranteed container.
    +  private final Map<ContainerId, Container> oppContainersMarkedForKill =
    +      new HashMap<>();
    +
    +  // Containers launched by the Scheduler will take a while to actually
    +  // move to the RUNNING state, but should still be fair game for killing
    +  // by the scheduler to make room for guaranteed containers.
    +  private final LinkedHashMap<ContainerId, Container> 
scheduledToRunContainers =
    +      new LinkedHashMap<>();
    +
    +  private final ContainerQueuingLimit queuingLimit =
    +      ContainerQueuingLimit.newInstance();
    +
    +  private final OpportunisticContainersStatus 
opportunisticContainersStatus;
    +
    +  // Resource Utilization Manager that decides how utilization of the 
cluster
    +  // increase / decreases based on container start / finish
    +  private ResourceUtilizationManager utilizationManager;
    +
    +  /**
    +   * Instantiate a Container Scheduler.
    +   * @param context NodeManager Context.
    +   */
    +  public ContainerScheduler(Context context) {
    +    this(context, context.getConf().getInt(
    +        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
    +        
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH_DEFAULT));
    +  }
    +
    +  @VisibleForTesting
    +  public ContainerScheduler(Context context, int qLength) {
    +    super(ContainerScheduler.class.getName());
    +    this.context = context;
    +    this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
    +    this.utilizationManager = new ResourceUtilizationManager(this);
    +    this.opportunisticContainersStatus =
    +        OpportunisticContainersStatus.newInstance();
    +  }
    +
    +  /**
    +   * Handle ContainerSchedulerEvents.
    +   * @param event ContainerSchedulerEvent.
    +   */
    +  @Override
    +  public void handle(ContainerSchedulerEvent event) {
    +    switch (event.getType()) {
    +    case SCHEDULE_CONTAINER:
    +      scheduleContainer(event.getContainer());
    +      break;
    +    case CONTAINER_COMPLETED:
    +      onContainerCompleted(event.getContainer());
    +      break;
    +    default:
    +      LOG.error("Unknown event arrived at ContainerScheduler: "
    +          + event.toString());
    +    }
    +  }
    +
    +  /**
    +   * Return number of queued containers.
    +   * @return Number of queued containers.
    +   */
    +  public int getNumQueuedContainers() {
    +    return this.queuedGuaranteedContainers.size()
    +        + this.queuedOpportunisticContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedGuaranteedContainers() {
    +    return this.queuedGuaranteedContainers.size();
    +  }
    +
    +  @VisibleForTesting
    +  public int getNumQueuedOpportunisticContainers() {
    +    return this.queuedOpportunisticContainers.size();
    +  }
    +
    +  public OpportunisticContainersStatus getOpportunisticContainersStatus() {
    +    this.opportunisticContainersStatus.setQueuedOpportContainers(
    +        getNumQueuedOpportunisticContainers());
    +    this.opportunisticContainersStatus.setWaitQueueLength(
    +        getNumQueuedContainers());
    +    return this.opportunisticContainersStatus;
    +  }
    +
    +  private void onContainerCompleted(Container container) {
    +    // decrement only if it was a running container
    +    if (scheduledToRunContainers.containsKey(container.getContainerId())) {
    +      this.utilizationManager.subtractContainerResource(container);
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.OPPORTUNISTIC) {
    +        this.opportunisticContainersStatus.setOpportMemoryUsed(
    +            this.opportunisticContainersStatus.getOpportMemoryUsed()
    +                - container.getResource().getMemorySize());
    +        this.opportunisticContainersStatus.setOpportCoresUsed(
    +            this.opportunisticContainersStatus.getOpportCoresUsed()
    +                - container.getResource().getVirtualCores());
    +        this.opportunisticContainersStatus.setRunningOpportContainers(
    +            this.opportunisticContainersStatus.getRunningOpportContainers()
    +                - 1);
    +      }
    +    }
    +    scheduledToRunContainers.remove(container.getContainerId());
    +    oppContainersMarkedForKill.remove(container.getContainerId());
    +    startPendingContainers();
    +  }
    +
    +  private void startPendingContainers() {
    +    // Start pending guaranteed containers, if resources available.
    +    boolean resourcesAvailable =
    +        startContainersFromQueue(queuedGuaranteedContainers.values());
    +    // Start opportunistic containers, if resources available.
    +    if (resourcesAvailable) {
    +      startContainersFromQueue(queuedOpportunisticContainers.values());
    +    }
    +  }
    +
    +  private boolean startContainersFromQueue(
    +      Collection<Container> queuedContainers) {
    +    Iterator<Container> cIter = queuedContainers.iterator();
    +    boolean resourcesAvailable = true;
    +    while (cIter.hasNext() && resourcesAvailable) {
    +      Container container = cIter.next();
    +      if (this.utilizationManager.hasResourcesAvailable(container)) {
    +        startAllocatedContainer(container);
    +        cIter.remove();
    +      } else {
    +        resourcesAvailable = false;
    +      }
    +    }
    +    return resourcesAvailable;
    +  }
    +
    +  @VisibleForTesting
    +  protected void scheduleContainer(Container container) {
    +    if (maxOppQueueLength <= 0) {
    +      startAllocatedContainer(container);
    +      return;
    +    }
    +    if (queuedGuaranteedContainers.isEmpty() &&
    +        queuedOpportunisticContainers.isEmpty() &&
    +        this.utilizationManager.hasResourcesAvailable(container)) {
    +      startAllocatedContainer(container);
    +    } else {
    +      LOG.info("No available resources for container {} to start its 
execution "
    +          + "immediately.", container.getContainerId());
    +      boolean isQueued = true;
    +      if (container.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.GUARANTEED) {
    +        queuedGuaranteedContainers.put(container.getContainerId(), 
container);
    +        // Kill running opportunistic containers to make space for
    +        // guaranteed container.
    +        killOpportunisticContainers(container);
    +      } else {
    +        if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
    +          LOG.info("Opportunistic container {} will be queued at the NM.",
    +              container.getContainerId());
    +          queuedOpportunisticContainers.put(
    +              container.getContainerId(), container);
    +        } else {
    +          isQueued = false;
    +          LOG.info("Opportunistic container [{}] will not be queued at the 
NM" +
    +              "since max queue length [{}] has been reached",
    +              container.getContainerId(), maxOppQueueLength);
    +          container.sendKillEvent(
    +              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +              "Opportunistic container queue is full.");
    +        }
    +      }
    +      if (isQueued) {
    +        try {
    +          this.context.getNMStateStore().storeContainerQueued(
    +              container.getContainerId());
    +        } catch (IOException e) {
    +          LOG.warn("Could not store container state into store..", e);
    +        }
    +      }
    +    }
    +  }
    +
    +  private void killOpportunisticContainers(Container container) {
    +    List<Container> extraOpportContainersToKill =
    +        pickOpportunisticContainersToKill(container.getContainerId());
    +    // Kill the opportunistic containers that were chosen.
    +    for (Container contToKill : extraOpportContainersToKill) {
    +      contToKill.sendKillEvent(
    +          ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +          "Container Killed to make room for Guaranteed Container.");
    +      oppContainersMarkedForKill.put(contToKill.getContainerId(), 
contToKill);
    +      LOG.info(
    +          "Opportunistic container {} will be killed in order to start the 
"
    +              + "execution of guaranteed container {}.",
    +          contToKill.getContainerId(), container.getContainerId());
    +    }
    +  }
    +
    +  private void startAllocatedContainer(Container container) {
    +    LOG.info("Starting container [" + container.getContainerId()+ "]");
    +    scheduledToRunContainers.put(container.getContainerId(), container);
    +    this.utilizationManager.addContainerResources(container);
    +    if (container.getContainerTokenIdentifier().getExecutionType() ==
    +        ExecutionType.OPPORTUNISTIC) {
    +      this.opportunisticContainersStatus.setOpportMemoryUsed(
    +          this.opportunisticContainersStatus.getOpportMemoryUsed()
    +              + container.getResource().getMemorySize());
    +      this.opportunisticContainersStatus.setOpportCoresUsed(
    +          this.opportunisticContainersStatus.getOpportCoresUsed()
    +              + container.getResource().getVirtualCores());
    +      this.opportunisticContainersStatus.setRunningOpportContainers(
    +          this.opportunisticContainersStatus.getRunningOpportContainers()
    +              + 1);
    +    }
    +    container.sendLaunchEvent();
    +  }
    +
    +  private List<Container> pickOpportunisticContainersToKill(
    +      ContainerId containerToStartId) {
    +    // The additional opportunistic containers that need to be killed for 
the
    +    // given container to start.
    +    List<Container> extraOpportContainersToKill = new ArrayList<>();
    +    // Track resources that need to be freed.
    +    ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
    +        containerToStartId);
    +
    +    // Go over the running opportunistic containers.
    +    // Use a descending iterator to kill more recently started containers.
    +    Iterator<Container> reverseContainerIterator =
    +        new 
LinkedList<>(scheduledToRunContainers.values()).descendingIterator();
    +    while(reverseContainerIterator.hasNext() &&
    +        !hasSufficientResources(resourcesToFreeUp)) {
    +      Container runningCont = reverseContainerIterator.next();
    +      if (runningCont.getContainerTokenIdentifier().getExecutionType() ==
    +          ExecutionType.OPPORTUNISTIC) {
    +
    +        if (oppContainersMarkedForKill.containsKey(
    +            runningCont.getContainerId())) {
    +          // These containers have already been marked to be killed.
    +          // So exclude them..
    +          continue;
    +        }
    +        extraOpportContainersToKill.add(runningCont);
    +        ResourceUtilizationManager.decreaseResourceUtilization(
    +            getContainersMonitor(), resourcesToFreeUp,
    +            runningCont.getResource());
    +      }
    +    }
    +    if (!hasSufficientResources(resourcesToFreeUp)) {
    +      LOG.warn("There are no sufficient resources to start guaranteed 
[{}]" +
    +          "even after attempting to kill all running" +
    +          "opportunistic containers.", containerToStartId);
    +    }
    +    return extraOpportContainersToKill;
    +  }
    +
    +  private boolean hasSufficientResources(
    +      ResourceUtilization resourcesToFreeUp) {
    +    return resourcesToFreeUp.getPhysicalMemory() <= 0 &&
    +        resourcesToFreeUp.getVirtualMemory() <= 0 &&
    +        resourcesToFreeUp.getCPU() <= 0.0f;
    +  }
    +
    +  private ResourceUtilization resourcesToFreeUp(
    +      ContainerId containerToStartId) {
    +    // Get allocation of currently allocated containers.
    +    ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
    +        .newInstance(this.utilizationManager.getCurrentUtilization());
    +
    +    // Add to the allocation the allocation of the pending guaranteed
    +    // containers that will start before the current container will be 
started.
    +    for (Container container : queuedGuaranteedContainers.values()) {
    +      ResourceUtilizationManager.increaseResourceUtilization(
    +          getContainersMonitor(), resourceAllocationToFreeUp,
    +          container.getResource());
    +      if (container.getContainerId().equals(containerToStartId)) {
    +        break;
    +      }
    +    }
    +
    +    // These Resources have already been freed, due to demand from an
    +    // earlier Guaranteed container.
    +    for (Container container : oppContainersMarkedForKill.values()) {
    +      ResourceUtilizationManager.decreaseResourceUtilization(
    +          getContainersMonitor(), resourceAllocationToFreeUp,
    +          container.getResource());
    +    }
    +
    +    // Subtract the overall node resources.
    +    getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
    +        resourceAllocationToFreeUp);
    +    return resourceAllocationToFreeUp;
    +  }
    +
    +  public void updateQueuingLimit(ContainerQueuingLimit limit) {
    +    this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
    +    // TODO: Include wait time as well once it is implemented
    +    if (this.queuingLimit.getMaxQueueLength() > -1) {
    +      shedQueuedOpportunisticContainers();
    +    }
    +  }
    +
    +  private void shedQueuedOpportunisticContainers() {
    +    int numAllowed = this.queuingLimit.getMaxQueueLength();
    +    Iterator<Container> containerIter =
    +        queuedOpportunisticContainers.values().iterator();
    +    while (containerIter.hasNext()) {
    +      Container container = containerIter.next();
    +      if (numAllowed <= 0) {
    +        container.sendKillEvent(
    +            ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
    +            "Container Killed to make room for Guaranteed Container.");
    --- End diff --
    
    The Log.info line is printed immediately, which is why it has 'will be 
killed'.
    The diagnostic line is part of the Container status message and is in 
past-tense and does not need the containerid.


> Add SCHEDULE to NM container lifecycle
> --------------------------------------
>
>                 Key: YARN-4597
>                 URL: https://issues.apache.org/jira/browse/YARN-4597
>             Project: Hadoop YARN
>          Issue Type: New Feature
>          Components: nodemanager
>            Reporter: Chris Douglas
>            Assignee: Arun Suresh
>              Labels: oct16-hard
>         Attachments: YARN-4597.001.patch, YARN-4597.002.patch, 
> YARN-4597.003.patch, YARN-4597.004.patch, YARN-4597.005.patch, 
> YARN-4597.006.patch, YARN-4597.007.patch, YARN-4597.008.patch, 
> YARN-4597.009.patch
>
>
> Currently, the NM immediately launches containers after resource 
> localization. Several features could be more cleanly implemented if the NM 
> included a separate stage for reserving resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to