Author: wang
Date: Fri Oct 25 01:56:56 2013
New Revision: 1535608
URL: http://svn.apache.org/r1535608
Log:
merge trunk to hdfs-4949 branch
Modified:
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
Modified: hadoop/common/branches/HDFS-4949/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/CHANGES.txt?rev=1535608&r1=1535607&r2=1535608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-yarn-project/CHANGES.txt Fri Oct 25
01:56:56 2013
@@ -108,6 +108,9 @@ Release 2.2.1 - UNRELEASED
YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza)
+ YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp
+ into SchedulerApplication (Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1535608&r1=1535607&r2=1535608&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
(original)
+++
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
Fri Oct 25 01:56:56 2013
@@ -17,44 +17,385 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
/**
- * Represents an Application from the viewpoint of the scheduler.
- * Each running Application in the RM corresponds to one instance
+ * Represents an application attempt from the viewpoint of the scheduler.
+ * Each running app attempt in the RM corresponds to one instance
* of this class.
*/
@Private
@Unstable
public abstract class SchedulerApplication {
+
+ private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
+
+ protected final AppSchedulingInfo appSchedulingInfo;
+
+ protected final Map<ContainerId, RMContainer> liveContainers =
+ new HashMap<ContainerId, RMContainer>();
+ protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
+ new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+ private final Multiset<Priority> reReservations = HashMultiset.create();
+
+ protected final Resource currentReservation = Resource.newInstance(0, 0);
+ private Resource resourceLimit = Resource.newInstance(0, 0);
+ protected final Resource currentConsumption = Resource.newInstance(0, 0);
+
+ protected List<RMContainer> newlyAllocatedContainers =
+ new ArrayList<RMContainer>();
/**
- * Get {@link ApplicationAttemptId} of the application master.
- * @return <code>ApplicationAttemptId</code> of the application master
+ * Count how many times the application has been given an opportunity
+ * to schedule a task at each priority. Each time the scheduler
+ * asks the application for a task at this priority, it is incremented,
+ * and each time the application successfully schedules a task, it
+ * is reset to 0.
*/
- public abstract ApplicationAttemptId getApplicationAttemptId();
+ Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+
+ // Time of the last container scheduled at the current allowed level
+ protected Map<Priority, Long> lastScheduledContainer =
+ new HashMap<Priority, Long>();
+
+ protected final Queue queue;
+ protected boolean isStopped = false;
+
+ protected final RMContext rmContext;
+
+ public SchedulerApplication(ApplicationAttemptId applicationAttemptId,
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ RMContext rmContext) {
+ this.rmContext = rmContext;
+ this.appSchedulingInfo =
+ new AppSchedulingInfo(applicationAttemptId, user, queue,
+ activeUsersManager);
+ this.queue = queue;
+ }
/**
* Get the live containers of the application.
* @return live containers of the application
*/
- public abstract Collection<RMContainer> getLiveContainers();
+ public synchronized Collection<RMContainer> getLiveContainers() {
+ return new ArrayList<RMContainer>(liveContainers.values());
+ }
/**
- * Get the reserved containers of the application.
- * @return the reserved containers of the application
+ * Is this application pending?
+ * @return true if it is else false.
*/
- public abstract Collection<RMContainer> getReservedContainers();
+ public boolean isPending() {
+ return appSchedulingInfo.isPending();
+ }
/**
- * Is this application pending?
- * @return true if it is else false.
+ * Get {@link ApplicationAttemptId} of the application master.
+ * @return <code>ApplicationAttemptId</code> of the application master
+ */
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appSchedulingInfo.getApplicationAttemptId();
+ }
+
+ public ApplicationId getApplicationId() {
+ return appSchedulingInfo.getApplicationId();
+ }
+
+ public String getUser() {
+ return appSchedulingInfo.getUser();
+ }
+
+ public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+ return appSchedulingInfo.getResourceRequests(priority);
+ }
+
+ public int getNewContainerId() {
+ return appSchedulingInfo.getNewContainerId();
+ }
+
+ public Collection<Priority> getPriorities() {
+ return appSchedulingInfo.getPriorities();
+ }
+
+ public ResourceRequest getResourceRequest(Priority priority, String
resourceName) {
+ return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
+ }
+
+ public synchronized int getTotalRequiredResources(Priority priority) {
+ return getResourceRequest(priority,
ResourceRequest.ANY).getNumContainers();
+ }
+
+ public Resource getResource(Priority priority) {
+ return appSchedulingInfo.getResource(priority);
+ }
+
+ public String getQueueName() {
+ return appSchedulingInfo.getQueueName();
+ }
+
+ public synchronized RMContainer getRMContainer(ContainerId id) {
+ return liveContainers.get(id);
+ }
+
+ protected synchronized void resetReReservations(Priority priority) {
+ reReservations.setCount(priority, 0);
+ }
+
+ protected synchronized void addReReservation(Priority priority) {
+ reReservations.add(priority);
+ }
+
+ public synchronized int getReReservations(Priority priority) {
+ return reReservations.count(priority);
+ }
+
+ /**
+ * Get total current reservations.
+ * Used only by unit tests
+ * @return total current reservations
+ */
+ @Stable
+ @Private
+ public synchronized Resource getCurrentReservation() {
+ return currentReservation;
+ }
+
+ public Queue getQueue() {
+ return queue;
+ }
+
+ public synchronized void updateResourceRequests(
+ List<ResourceRequest> requests) {
+ if (!isStopped) {
+ appSchedulingInfo.updateResourceRequests(requests);
+ }
+ }
+
+ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ // Cleanup all scheduling information
+ isStopped = true;
+ appSchedulingInfo.stop(rmAppAttemptFinalState);
+ }
+
+ public synchronized boolean isStopped() {
+ return isStopped;
+ }
+
+ /**
+ * Get the list of reserved containers
+ * @return All of the reserved containers.
+ */
+ public synchronized List<RMContainer> getReservedContainers() {
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
+ this.reservedContainers.entrySet()) {
+ reservedContainers.addAll(e.getValue().values());
+ }
+ return reservedContainers;
+ }
+
+ public synchronized RMContainer reserve(SchedulerNode node, Priority
priority,
+ RMContainer rmContainer, Container container) {
+ // Create RMContainer if necessary
+ if (rmContainer == null) {
+ rmContainer =
+ new RMContainerImpl(container, getApplicationAttemptId(),
+ node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
+ rmContext.getContainerAllocationExpirer());
+
+ Resources.addTo(currentReservation, container.getResource());
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+ } else {
+ // Note down the re-reservation
+ addReReservation(priority);
+ }
+ rmContainer.handle(new RMContainerReservedEvent(container.getId(),
+ container.getResource(), node.getNodeID(), priority));
+
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers == null) {
+ reservedContainers = new HashMap<NodeId, RMContainer>();
+ this.reservedContainers.put(priority, reservedContainers);
+ }
+ reservedContainers.put(node.getNodeID(), rmContainer);
+
+ LOG.info("Application " + getApplicationId()
+ + " reserved container " + rmContainer
+ + " on node " + node + ", currently has " + reservedContainers.size()
+ + " at priority " + priority
+ + "; currentReservation " + currentReservation.getMemory());
+
+ return rmContainer;
+ }
+
+ /**
+ * Has the application reserved the given <code>node</code> at the
+ * given <code>priority</code>?
+ * @param node node to be checked
+ * @param priority priority of reserved container
+ * @return true is reserved, false if not
+ */
+ public synchronized boolean isReserved(SchedulerNode node, Priority
priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers != null) {
+ return reservedContainers.containsKey(node.getNodeID());
+ }
+ return false;
+ }
+
+ public synchronized void setHeadroom(Resource globalLimit) {
+ this.resourceLimit = globalLimit;
+ }
+
+ /**
+ * Get available headroom in terms of resources for the application's user.
+ * @return available resource headroom
+ */
+ public synchronized Resource getHeadroom() {
+ // Corner case to deal with applications being slightly over-limit
+ if (resourceLimit.getMemory() < 0) {
+ resourceLimit.setMemory(0);
+ }
+
+ return resourceLimit;
+ }
+
+ public synchronized int getNumReservedContainers(Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ return (reservedContainers == null) ? 0 : reservedContainers.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void containerLaunchedOnNode(ContainerId containerId,
+ NodeId nodeId) {
+ // Inform the container
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ // Some unknown container sneaked into the system. Kill it.
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+ return;
+ }
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.LAUNCHED));
+ }
+
+ public synchronized void showRequests() {
+ if (LOG.isDebugEnabled()) {
+ for (Priority priority : getPriorities()) {
+ Map<String, ResourceRequest> requests = getResourceRequests(priority);
+ if (requests != null) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId() +
+ " headRoom=" + getHeadroom() +
+ " currentConsumption=" + currentConsumption.getMemory());
+ for (ResourceRequest request : requests.values()) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId()
+ + " request=" + request);
+ }
+ }
+ }
+ }
+ }
+
+ public Resource getCurrentConsumption() {
+ return currentConsumption;
+ }
+
+ public synchronized List<Container> pullNewlyAllocatedContainers() {
+ List<Container> returnContainerList = new ArrayList<Container>(
+ newlyAllocatedContainers.size());
+ for (RMContainer rmContainer : newlyAllocatedContainers) {
+ rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+ RMContainerEventType.ACQUIRED));
+ returnContainerList.add(rmContainer.getContainer());
+ }
+ newlyAllocatedContainers.clear();
+ return returnContainerList;
+ }
+
+ public synchronized void updateBlacklist(
+ List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ if (!isStopped) {
+ this.appSchedulingInfo.updateBlacklist(
+ blacklistAdditions, blacklistRemovals);
+ }
+ }
+
+ public boolean isBlacklisted(String resourceName) {
+ return this.appSchedulingInfo.isBlacklisted(resourceName);
+ }
+
+ public synchronized void addSchedulingOpportunity(Priority priority) {
+ schedulingOpportunities.setCount(priority,
+ schedulingOpportunities.count(priority) + 1);
+ }
+
+ public synchronized void subtractSchedulingOpportunity(Priority priority) {
+ int count = schedulingOpportunities.count(priority) - 1;
+ this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
+ }
+
+ /**
+ * Return the number of times the application has been given an opportunity
+ * to schedule a task at the given priority since the last time it
+ * successfully did so.
+ */
+ public synchronized int getSchedulingOpportunities(Priority priority) {
+ return schedulingOpportunities.count(priority);
+ }
+
+ /**
+ * Should be called when an application has successfully scheduled a
container,
+ * or when the scheduling locality threshold is relaxed.
+ * Reset various internal counters which affect delay scheduling
+ *
+ * @param priority The priority of the container scheduled.
*/
- public abstract boolean isPending();
+ public synchronized void resetSchedulingOpportunities(Priority priority) {
+ resetSchedulingOpportunities(priority, System.currentTimeMillis());
+ }
+ // used for continuous scheduling
+ public synchronized void resetSchedulingOpportunities(Priority priority,
+ long currentTimeMs) {
+ lastScheduledContainer.put(priority, currentTimeMs);
+ schedulingOpportunities.setCount(priority, 0);
+ }
}
Modified:
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1535608&r1=1535607&r2=1535608&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
(original)
+++
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
Fri Oct 25 01:56:56 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -72,4 +73,11 @@ public abstract class SchedulerNode {
* @return total resources on the node.
*/
public abstract Resource getTotalResource();
+
+ /**
+ * Get the ID of the node which contains both its hostname and port.
+ * @return the ID of the node
+ */
+ public abstract NodeId getNodeID();
+
}
Modified:
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1535608&r1=1535607&r2=1535608&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
(original)
+++
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
Fri Oct 25 01:56:56 2013
@@ -18,22 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -41,194 +35,39 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-
/**
- * Represents an Application from the viewpoint of the scheduler.
- * Each running Application in the RM corresponds to one instance
- * of this class.
+ * Represents an application attempt from the viewpoint of the FIFO or Capacity
+ * scheduler.
*/
-@SuppressWarnings("unchecked")
@Private
@Unstable
public class FiCaSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
- private final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private final AppSchedulingInfo appSchedulingInfo;
- private final Queue queue;
-
- private final Resource currentConsumption = recordFactory
- .newRecordInstance(Resource.class);
- private Resource resourceLimit = recordFactory
- .newRecordInstance(Resource.class);
-
- private Map<ContainerId, RMContainer> liveContainers =
- new HashMap<ContainerId, RMContainer>();
- private List<RMContainer> newlyAllocatedContainers =
- new ArrayList<RMContainer>();
-
- final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
- new HashMap<Priority, Map<NodeId, RMContainer>>();
-
- private boolean isStopped = false;
-
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
- /**
- * Count how many times the application has been given an opportunity
- * to schedule a task at each priority. Each time the scheduler
- * asks the application for a task at this priority, it is incremented,
- * and each time the application successfully schedules a task, it
- * is reset to 0.
- */
- Multiset<Priority> schedulingOpportunities = HashMultiset.create();
-
- Multiset<Priority> reReservations = HashMultiset.create();
-
- Resource currentReservation = recordFactory
- .newRecordInstance(Resource.class);
-
- private final RMContext rmContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
- this.rmContext = rmContext;
- this.appSchedulingInfo =
- new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager);
- this.queue = queue;
- }
-
- public ApplicationId getApplicationId() {
- return this.appSchedulingInfo.getApplicationId();
- }
-
- @Override
- public ApplicationAttemptId getApplicationAttemptId() {
- return this.appSchedulingInfo.getApplicationAttemptId();
- }
-
- public String getUser() {
- return this.appSchedulingInfo.getUser();
- }
-
- public synchronized void updateResourceRequests(
- List<ResourceRequest> requests) {
- if (!isStopped) {
- this.appSchedulingInfo.updateResourceRequests(requests);
- }
- }
-
- public synchronized void updateBlacklist(
- List<String> blacklistAdditions, List<String> blacklistRemovals) {
- if (!isStopped) {
- this.appSchedulingInfo.updateBlacklist(
- blacklistAdditions, blacklistRemovals);
- }
- }
-
- public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
- return this.appSchedulingInfo.getResourceRequests(priority);
- }
-
- public int getNewContainerId() {
- return this.appSchedulingInfo.getNewContainerId();
- }
-
- public Collection<Priority> getPriorities() {
- return this.appSchedulingInfo.getPriorities();
- }
-
- public ResourceRequest getResourceRequest(Priority priority, String
resourceName) {
- return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
- }
-
- public synchronized int getTotalRequiredResources(Priority priority) {
- return getResourceRequest(priority,
ResourceRequest.ANY).getNumContainers();
- }
-
- public Resource getResource(Priority priority) {
- return this.appSchedulingInfo.getResource(priority);
- }
-
- public boolean isBlacklisted(String resourceName) {
- return this.appSchedulingInfo.isBlacklisted(resourceName);
- }
-
- /**
- * Is this application pending?
- * @return true if it is else false.
- */
- @Override
- public boolean isPending() {
- return this.appSchedulingInfo.isPending();
- }
-
- public synchronized boolean isStopped() {
- return this.isStopped;
- }
-
- public String getQueueName() {
- return this.appSchedulingInfo.getQueueName();
- }
-
- /**
- * Get the list of live containers
- * @return All of the live containers
- */
- @Override
- public synchronized Collection<RMContainer> getLiveContainers() {
- return new ArrayList<RMContainer>(liveContainers.values());
- }
-
- public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
- // Cleanup all scheduling information
- this.isStopped = true;
- this.appSchedulingInfo.stop(rmAppAttemptFinalState);
- }
-
- public synchronized void containerLaunchedOnNode(ContainerId containerId,
- NodeId nodeId) {
- // Inform the container
- RMContainer rmContainer =
- getRMContainer(containerId);
- if (rmContainer == null) {
- // Some unknown container sneaked into the system. Kill it.
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
- return;
- }
-
- rmContainer.handle(new RMContainerEvent(containerId,
- RMContainerEventType.LAUNCHED));
+ super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
@@ -310,133 +149,6 @@ public class FiCaSchedulerApp extends Sc
return rmContainer;
}
-
- synchronized public List<Container> pullNewlyAllocatedContainers() {
- List<Container> returnContainerList = new ArrayList<Container>(
- newlyAllocatedContainers.size());
- for (RMContainer rmContainer : newlyAllocatedContainers) {
- rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
- RMContainerEventType.ACQUIRED));
- returnContainerList.add(rmContainer.getContainer());
- }
- newlyAllocatedContainers.clear();
- return returnContainerList;
- }
-
- public Resource getCurrentConsumption() {
- return this.currentConsumption;
- }
-
- synchronized public void showRequests() {
- if (LOG.isDebugEnabled()) {
- for (Priority priority : getPriorities()) {
- Map<String, ResourceRequest> requests = getResourceRequests(priority);
- if (requests != null) {
- LOG.debug("showRequests:" + " application=" + getApplicationId() +
- " headRoom=" + getHeadroom() +
- " currentConsumption=" + currentConsumption.getMemory());
- for (ResourceRequest request : requests.values()) {
- LOG.debug("showRequests:" + " application=" + getApplicationId()
- + " request=" + request);
- }
- }
- }
- }
- }
-
- public synchronized RMContainer getRMContainer(ContainerId id) {
- return liveContainers.get(id);
- }
-
- synchronized public void resetSchedulingOpportunities(Priority priority) {
- this.schedulingOpportunities.setCount(priority, 0);
- }
-
- synchronized public void addSchedulingOpportunity(Priority priority) {
- this.schedulingOpportunities.setCount(priority,
- schedulingOpportunities.count(priority) + 1);
- }
-
- synchronized public void subtractSchedulingOpportunity(Priority priority) {
- int count = schedulingOpportunities.count(priority) - 1;
- this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
- }
-
- /**
- * @param priority Target priority
- * @return the number of times the application has been given an opportunity
- * to schedule a task at the given priority since the last time it
- * successfully did so.
- */
- synchronized public int getSchedulingOpportunities(Priority priority) {
- return this.schedulingOpportunities.count(priority);
- }
-
- synchronized void resetReReservations(Priority priority) {
- this.reReservations.setCount(priority, 0);
- }
-
- synchronized void addReReservation(Priority priority) {
- this.reReservations.add(priority);
- }
-
- synchronized public int getReReservations(Priority priority) {
- return this.reReservations.count(priority);
- }
-
- public synchronized int getNumReservedContainers(Priority priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- return (reservedContainers == null) ? 0 : reservedContainers.size();
- }
-
- /**
- * Get total current reservations.
- * Used only by unit tests
- * @return total current reservations
- */
- @Stable
- @Private
- public synchronized Resource getCurrentReservation() {
- return currentReservation;
- }
-
- public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority
priority,
- RMContainer rmContainer, Container container) {
- // Create RMContainer if necessary
- if (rmContainer == null) {
- rmContainer =
- new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer());
-
- Resources.addTo(currentReservation, container.getResource());
-
- // Reset the re-reservation count
- resetReReservations(priority);
- } else {
- // Note down the re-reservation
- addReReservation(priority);
- }
- rmContainer.handle(new RMContainerReservedEvent(container.getId(),
- container.getResource(), node.getNodeID(), priority));
-
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers == null) {
- reservedContainers = new HashMap<NodeId, RMContainer>();
- this.reservedContainers.put(priority, reservedContainers);
- }
- reservedContainers.put(node.getNodeID(), rmContainer);
-
- LOG.info("Application " + getApplicationId()
- + " reserved container " + rmContainer
- + " on node " + node + ", currently has " + reservedContainers.size()
- + " at priority " + priority
- + "; currentReservation " + currentReservation.getMemory());
-
- return rmContainer;
- }
public synchronized boolean unreserve(FiCaSchedulerNode node, Priority
priority) {
Map<NodeId, RMContainer> reservedContainers =
@@ -470,22 +182,6 @@ public class FiCaSchedulerApp extends Sc
return false;
}
- /**
- * Has the application reserved the given <code>node</code> at the
- * given <code>priority</code>?
- * @param node node to be checked
- * @param priority priority of reserved container
- * @return true is reserved, false if not
- */
- public synchronized boolean isReserved(FiCaSchedulerNode node, Priority
priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers != null) {
- return reservedContainers.containsKey(node.getNodeID());
- }
- return false;
- }
-
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
@@ -497,41 +193,6 @@ public class FiCaSchedulerApp extends Sc
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
- /**
- * Get the list of reserved containers
- * @return All of the reserved containers.
- */
- @Override
- public synchronized List<RMContainer> getReservedContainers() {
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
- for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
- this.reservedContainers.entrySet()) {
- reservedContainers.addAll(e.getValue().values());
- }
- return reservedContainers;
- }
-
- public synchronized void setHeadroom(Resource globalLimit) {
- this.resourceLimit = globalLimit;
- }
-
- /**
- * Get available headroom in terms of resources for the application's user.
- * @return available resource headroom
- */
- public synchronized Resource getHeadroom() {
- // Corner case to deal with applications being slightly over-limit
- if (resourceLimit.getMemory() < 0) {
- resourceLimit.setMemory(0);
- }
-
- return resourceLimit;
- }
-
- public Queue getQueue() {
- return queue;
- }
-
public Resource getTotalPendingRequests() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
Modified:
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1535608&r1=1535607&r2=1535608&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
(original)
+++
hadoop/common/branches/HDFS-4949/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
Fri Oct 25 01:56:56 2013
@@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,7 +27,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -38,92 +34,39 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-
+/**
+ * Represents an application attempt from the viewpoint of the Fair Scheduler.
+ */
@Private
@Unstable
public class FSSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
- private final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private final AppSchedulingInfo appSchedulingInfo;
private AppSchedulable appSchedulable;
- private final Queue queue;
-
- private final Resource currentConsumption = recordFactory
- .newRecordInstance(Resource.class);
- private Resource resourceLimit = recordFactory
- .newRecordInstance(Resource.class);
-
- private Map<ContainerId, RMContainer> liveContainers
- = new HashMap<ContainerId, RMContainer>();
- private List<RMContainer> newlyAllocatedContainers =
- new ArrayList<RMContainer>();
-
- final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
- new HashMap<Priority, Map<NodeId, RMContainer>>();
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer,
Long>();
-
- /**
- * Count how many times the application has been given an opportunity
- * to schedule a task at each priority. Each time the scheduler
- * asks the application for a task at this priority, it is incremented,
- * and each time the application successfully schedules a task, it
- * is reset to 0.
- */
- Multiset<Priority> schedulingOpportunities = HashMultiset.create();
- Multiset<Priority> reReservations = HashMultiset.create();
-
- Resource currentReservation = recordFactory
- .newRecordInstance(Resource.class);
-
- private final RMContext rmContext;
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
- this.rmContext = rmContext;
- this.appSchedulingInfo =
- new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager);
- this.queue = queue;
- }
-
- public ApplicationId getApplicationId() {
- return appSchedulingInfo.getApplicationId();
+ super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
- @Override
- public ApplicationAttemptId getApplicationAttemptId() {
- return appSchedulingInfo.getApplicationAttemptId();
- }
-
public void setAppSchedulable(AppSchedulable appSchedulable) {
this.appSchedulable = appSchedulable;
}
@@ -132,83 +75,6 @@ public class FSSchedulerApp extends Sche
return appSchedulable;
}
- public String getUser() {
- return appSchedulingInfo.getUser();
- }
-
- public synchronized void updateResourceRequests(
- List<ResourceRequest> requests) {
- this.appSchedulingInfo.updateResourceRequests(requests);
- }
-
- public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
- return appSchedulingInfo.getResourceRequests(priority);
- }
-
- public int getNewContainerId() {
- return appSchedulingInfo.getNewContainerId();
- }
-
- public Collection<Priority> getPriorities() {
- return appSchedulingInfo.getPriorities();
- }
-
- public ResourceRequest getResourceRequest(Priority priority, String
nodeAddress) {
- return appSchedulingInfo.getResourceRequest(priority, nodeAddress);
- }
-
- public synchronized int getTotalRequiredResources(Priority priority) {
- return getResourceRequest(priority,
ResourceRequest.ANY).getNumContainers();
- }
-
- public Resource getResource(Priority priority) {
- return appSchedulingInfo.getResource(priority);
- }
-
- /**
- * Is this application pending?
- * @return true if it is else false.
- */
- @Override
- public boolean isPending() {
- return appSchedulingInfo.isPending();
- }
-
- public String getQueueName() {
- return appSchedulingInfo.getQueueName();
- }
-
- /**
- * Get the list of live containers
- * @return All of the live containers
- */
- @Override
- public synchronized Collection<RMContainer> getLiveContainers() {
- return new ArrayList<RMContainer>(liveContainers.values());
- }
-
- public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
- // Cleanup all scheduling information
- appSchedulingInfo.stop(rmAppAttemptFinalState);
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void containerLaunchedOnNode(ContainerId containerId,
- NodeId nodeId) {
- // Inform the container
- RMContainer rmContainer =
- getRMContainer(containerId);
- if (rmContainer == null) {
- // Some unknown container sneaked into the system. Kill it.
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
- return;
- }
-
- rmContainer.handle(new RMContainerEvent(containerId,
- RMContainerEventType.LAUNCHED));
- }
-
synchronized public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
@@ -241,122 +107,6 @@ public class FSSchedulerApp extends Sche
preemptionMap.remove(rmContainer);
}
- synchronized public List<Container> pullNewlyAllocatedContainers() {
- List<Container> returnContainerList = new ArrayList<Container>(
- newlyAllocatedContainers.size());
- for (RMContainer rmContainer : newlyAllocatedContainers) {
- rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
- RMContainerEventType.ACQUIRED));
- returnContainerList.add(rmContainer.getContainer());
- }
- newlyAllocatedContainers.clear();
- return returnContainerList;
- }
-
- public Resource getCurrentConsumption() {
- return this.currentConsumption;
- }
-
- synchronized public void showRequests() {
- if (LOG.isDebugEnabled()) {
- for (Priority priority : getPriorities()) {
- Map<String, ResourceRequest> requests = getResourceRequests(priority);
- if (requests != null) {
- LOG.debug("showRequests:" + " application=" + getApplicationId() +
- " headRoom=" + getHeadroom() +
- " currentConsumption=" + currentConsumption.getMemory());
- for (ResourceRequest request : requests.values()) {
- LOG.debug("showRequests:" + " application=" + getApplicationId()
- + " request=" + request);
- }
- }
- }
- }
- }
-
- public synchronized RMContainer getRMContainer(ContainerId id) {
- return liveContainers.get(id);
- }
-
- synchronized public void addSchedulingOpportunity(Priority priority) {
- schedulingOpportunities.setCount(priority,
- schedulingOpportunities.count(priority) + 1);
- }
-
- /**
- * Return the number of times the application has been given an opportunity
- * to schedule a task at the given priority since the last time it
- * successfully did so.
- */
- synchronized public int getSchedulingOpportunities(Priority priority) {
- return schedulingOpportunities.count(priority);
- }
-
- synchronized void resetReReservations(Priority priority) {
- reReservations.setCount(priority, 0);
- }
-
- synchronized void addReReservation(Priority priority) {
- reReservations.add(priority);
- }
-
- synchronized public int getReReservations(Priority priority) {
- return reReservations.count(priority);
- }
-
- public synchronized int getNumReservedContainers(Priority priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- return (reservedContainers == null) ? 0 : reservedContainers.size();
- }
-
- /**
- * Get total current reservations.
- * Used only by unit tests
- * @return total current reservations
- */
- @VisibleForTesting
- public synchronized Resource getCurrentReservation() {
- return currentReservation;
- }
-
- public synchronized RMContainer reserve(FSSchedulerNode node, Priority
priority,
- RMContainer rmContainer, Container container) {
- // Create RMContainer if necessary
- if (rmContainer == null) {
- rmContainer =
- new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer());
-
- Resources.addTo(currentReservation, container.getResource());
-
- // Reset the re-reservation count
- resetReReservations(priority);
- } else {
- // Note down the re-reservation
- addReReservation(priority);
- }
- rmContainer.handle(new RMContainerReservedEvent(container.getId(),
- container.getResource(), node.getNodeID(), priority));
-
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers == null) {
- reservedContainers = new HashMap<NodeId, RMContainer>();
- this.reservedContainers.put(priority, reservedContainers);
- }
- reservedContainers.put(node.getNodeID(), rmContainer);
-
- LOG.info("Application " + getApplicationId()
- + " reserved container " + rmContainer
- + " on node " + node + ", currently has " + reservedContainers.size()
- + " at priority " + priority
- + "; currentReservation " + currentReservation.getMemory());
-
- return rmContainer;
- }
-
public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
@@ -376,22 +126,6 @@ public class FSSchedulerApp extends Sche
+ priority + "; currentReservation " + currentReservation);
}
- /**
- * Has the application reserved the given <code>node</code> at the
- * given <code>priority</code>?
- * @param node node to be checked
- * @param priority priority of reserved container
- * @return true is reserved, false if not
- */
- public synchronized boolean isReserved(FSSchedulerNode node, Priority
priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers != null) {
- return reservedContainers.containsKey(node.getNodeID());
- }
- return false;
- }
-
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
@@ -402,42 +136,7 @@ public class FSSchedulerApp extends Sche
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
-
- /**
- * Get the list of reserved containers
- * @return All of the reserved containers.
- */
- @Override
- public synchronized List<RMContainer> getReservedContainers() {
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
- for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
- this.reservedContainers.entrySet()) {
- reservedContainers.addAll(e.getValue().values());
- }
- return reservedContainers;
- }
- public synchronized void setHeadroom(Resource globalLimit) {
- this.resourceLimit = globalLimit;
- }
-
- /**
- * Get available headroom in terms of resources for the application's user.
- * @return available resource headroom
- */
- public synchronized Resource getHeadroom() {
- // Corner case to deal with applications being slightly over-limit
- if (resourceLimit.getMemory() < 0) {
- resourceLimit.setMemory(0);
- }
-
- return resourceLimit;
- }
-
- public Queue getQueue() {
- return queue;
- }
-
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To acheive this
@@ -453,26 +152,6 @@ public class FSSchedulerApp extends Sche
final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
Priority, NodeType>();
- // Time of the last container scheduled at the current allowed level
- Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
-
- /**
- * Should be called when an application has successfully scheduled a
container,
- * or when the scheduling locality threshold is relaxed.
- * Reset various internal counters which affect delay scheduling
- *
- * @param priority The priority of the container scheduled.
- */
- synchronized public void resetSchedulingOpportunities(Priority priority) {
- resetSchedulingOpportunities(priority, System.currentTimeMillis());
- }
- // used for continuous scheduling
- synchronized public void resetSchedulingOpportunities(Priority priority,
- long currentTimeMs) {
- lastScheduledContainer.put(priority, currentTimeMs);
- schedulingOpportunities.setCount(priority, 0);
- }
-
/**
* Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to