Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Tue Aug 19 23:49:39 2014 @@ -68,7 +68,9 @@ public class AllocationFileLoaderService * (this is done to prevent loading a file that hasn't been fully written). */ public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000; - + + public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + private final Clock clock; private long lastSuccessfulReload; // Last time we successfully reloaded queues @@ -96,58 +98,69 @@ public class AllocationFileLoaderService } @Override - public void init(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { this.allocFile = getAllocationFile(conf); - super.init(conf); - } - - @Override - public void start() { - if (allocFile == null) { - return; - } - reloadThread = new Thread() { - public void run() { - while (running) { - long time = clock.getTime(); - long lastModified = allocFile.lastModified(); - if (lastModified > lastSuccessfulReload && - time > lastModified + ALLOC_RELOAD_WAIT_MS) { - try { - reloadAllocations(); - } catch (Exception ex) { + if (allocFile != null) { + reloadThread = new Thread() { + @Override + public void run() { + while (running) { + long time = clock.getTime(); + long lastModified = allocFile.lastModified(); + if (lastModified > lastSuccessfulReload && + time > lastModified + ALLOC_RELOAD_WAIT_MS) { + try { + reloadAllocations(); + } catch (Exception ex) { + if (!lastReloadAttemptFailed) { + LOG.error("Failed to reload fair scheduler config file - " + + "will use existing allocations.", ex); + } + lastReloadAttemptFailed = true; + } + } else if (lastModified == 0l) { if (!lastReloadAttemptFailed) { - LOG.error("Failed to reload fair scheduler config file - " + - "will use existing allocations.", ex); + LOG.warn("Failed to reload fair scheduler config file because" + + " last modified returned 0. File exists: " + + allocFile.exists()); } lastReloadAttemptFailed = true; } - } else if (lastModified == 0l) { - if (!lastReloadAttemptFailed) { - LOG.warn("Failed to reload fair scheduler config file because" + - " last modified returned 0. File exists: " + allocFile.exists()); + try { + Thread.sleep(reloadIntervalMs); + } catch (InterruptedException ex) { + LOG.info( + "Interrupted while waiting to reload alloc configuration"); } - lastReloadAttemptFailed = true; - } - try { - Thread.sleep(reloadIntervalMs); - } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting to reload alloc configuration"); } } - } - }; - reloadThread.setName("AllocationFileReloader"); - reloadThread.setDaemon(true); - reloadThread.start(); - super.start(); + }; + reloadThread.setName("AllocationFileReloader"); + reloadThread.setDaemon(true); + } + super.serviceInit(conf); } @Override - public void stop() { + public void serviceStart() throws Exception { + if (reloadThread != null) { + reloadThread.start(); + } + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { running = false; - reloadThread.interrupt(); - super.stop(); + if (reloadThread != null) { + reloadThread.interrupt(); + try { + reloadThread.join(THREAD_JOIN_TIMEOUT_MS); + } catch (InterruptedException e) { + LOG.warn("reloadThread fails to join."); + } + } + super.serviceStop(); } /** @@ -200,6 +213,7 @@ public class AllocationFileLoaderService Map<String, Resource> maxQueueResources = new HashMap<String, Resource>(); Map<String, Integer> queueMaxApps = new HashMap<String, Integer>(); Map<String, Integer> userMaxApps = new HashMap<String, Integer>(); + Map<String, Float> queueMaxAMShares = new HashMap<String, Float>(); Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>(); Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>(); Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>(); @@ -207,6 +221,7 @@ public class AllocationFileLoaderService new HashMap<String, Map<QueueACL, AccessControlList>>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; + float queueMaxAMShareDefault = -1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; @@ -214,8 +229,15 @@ public class AllocationFileLoaderService QueuePlacementPolicy newPlacementPolicy = null; // Remember all queue names so we can display them on web UI, etc. - Set<String> queueNamesInAllocFile = new HashSet<String>(); - + // configuredQueues is segregated based on whether it is a leaf queue + // or a parent queue. This information is used for creating queues + // and also for making queue placement decisions(QueuePlacementRule.java). + Map<FSQueueType, Set<String>> configuredQueues = + new HashMap<FSQueueType, Set<String>>(); + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet<String>()); + } + // Read and parse the allocations file. DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); @@ -266,6 +288,11 @@ public class AllocationFileLoaderService String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxAppsDefault = val; + } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShareDefault = val; } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); @@ -289,26 +316,27 @@ public class AllocationFileLoaderService } parent = null; } - loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); + loadQueue(parent, element, minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, + configuredQueues); } // Load placement policy and pass it configured queues Configuration conf = getConfig(); if (placementPolicyElement != null) { newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, - queueNamesInAllocFile, conf); + configuredQueues, conf); } else { newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, - queueNamesInAllocFile); + configuredQueues); } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, + queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, + queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, - newPlacementPolicy, queueNamesInAllocFile); + newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -321,10 +349,12 @@ public class AllocationFileLoaderService */ private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources, Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps, - Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights, + Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares, + Map<String, ResourceWeights> queueWeights, Map<String, SchedulingPolicy> queuePolicies, Map<String, Long> minSharePreemptionTimeouts, - Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile) + Map<String, Map<QueueACL, AccessControlList>> queueAcls, + Map<FSQueueType, Set<String>> configuredQueues) throws AllocationConfigurationException { String queueName = element.getAttribute("name"); if (parentName != null) { @@ -352,6 +382,11 @@ public class AllocationFileLoaderService String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxApps.put(queueName, val); + } else if ("maxAMShare".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShares.put(queueName, val); } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); @@ -374,14 +409,21 @@ public class AllocationFileLoaderService } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, + configuredQueues); + configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } } if (isLeaf) { - queueNamesInAllocFile.add(queueName); + // if a leaf in the alloc file is marked as type='parent' + // then store it under 'parent' + if ("parent".equals(element.getAttribute("type"))) { + configuredQueues.get(FSQueueType.PARENT).add(queueName); + } else { + configuredQueues.get(FSQueueType.LEAF).add(queueName); + } } queueAcls.put(queueName, acls); if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Tue Aug 19 23:49:39 2014 @@ -33,21 +33,22 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @Private @Unstable public class FSLeafQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSLeafQueue.class.getName()); - - private final List<AppSchedulable> runnableAppScheds = // apps that are runnable - new ArrayList<AppSchedulable>(); - private final List<AppSchedulable> nonRunnableAppScheds = - new ArrayList<AppSchedulable>(); + + private final List<FSAppAttempt> runnableApps = // apps that are runnable + new ArrayList<FSAppAttempt>(); + private final List<FSAppAttempt> nonRunnableApps = + new ArrayList<FSAppAttempt>(); private Resource demand = Resources.createResource(0); @@ -55,6 +56,9 @@ public class FSLeafQueue extends FSQueue private long lastTimeAtMinShare; private long lastTimeAtHalfFairShare; + // Track the AM resource usage for this queue + private Resource amResourceUsage; + private final ActiveUsersManager activeUsersManager; public FSLeafQueue(String name, FairScheduler scheduler, @@ -63,31 +67,34 @@ public class FSLeafQueue extends FSQueue this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); + amResourceUsage = Resource.newInstance(0, 0); } - public void addApp(FSSchedulerApp app, boolean runnable) { - AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); - app.setAppSchedulable(appSchedulable); + public void addApp(FSAppAttempt app, boolean runnable) { if (runnable) { - runnableAppScheds.add(appSchedulable); + runnableApps.add(app); } else { - nonRunnableAppScheds.add(appSchedulable); + nonRunnableApps.add(app); } } // for testing - void addAppSchedulable(AppSchedulable appSched) { - runnableAppScheds.add(appSched); + void addAppSchedulable(FSAppAttempt appSched) { + runnableApps.add(appSched); } /** * Removes the given app from this queue. * @return whether or not the app was runnable */ - public boolean removeApp(FSSchedulerApp app) { - if (runnableAppScheds.remove(app.getAppSchedulable())) { + public boolean removeApp(FSAppAttempt app) { + if (runnableApps.remove(app)) { + // Update AM resource usage + if (app.isAmRunning() && app.getAMResource() != null) { + Resources.subtractFrom(amResourceUsage, app.getAMResource()); + } return true; - } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { + } else if (nonRunnableApps.remove(app)) { return false; } else { throw new IllegalStateException("Given app to remove " + app + @@ -95,22 +102,22 @@ public class FSLeafQueue extends FSQueue } } - public Collection<AppSchedulable> getRunnableAppSchedulables() { - return runnableAppScheds; + public Collection<FSAppAttempt> getRunnableAppSchedulables() { + return runnableApps; } - public List<AppSchedulable> getNonRunnableAppSchedulables() { - return nonRunnableAppScheds; + public List<FSAppAttempt> getNonRunnableAppSchedulables() { + return nonRunnableApps; } @Override public void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { - for (AppSchedulable appSched : runnableAppScheds) { - apps.add(appSched.getApp().getApplicationAttemptId()); + for (FSAppAttempt appSched : runnableApps) { + apps.add(appSched.getApplicationAttemptId()); } - for (AppSchedulable appSched : nonRunnableAppScheds) { - apps.add(appSched.getApp().getApplicationAttemptId()); + for (FSAppAttempt appSched : nonRunnableApps) { + apps.add(appSched.getApplicationAttemptId()); } } @@ -136,15 +143,19 @@ public class FSLeafQueue extends FSQueue @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (AppSchedulable app : runnableAppScheds) { + for (FSAppAttempt app : runnableApps) { Resources.addTo(usage, app.getResourceUsage()); } - for (AppSchedulable app : nonRunnableAppScheds) { + for (FSAppAttempt app : nonRunnableApps) { Resources.addTo(usage, app.getResourceUsage()); } return usage; } + public Resource getAmResourceUsage() { + return amResourceUsage; + } + @Override public void updateDemand() { // Compute demand by iterating through apps in the queue @@ -152,13 +163,13 @@ public class FSLeafQueue extends FSQueue Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - for (AppSchedulable sched : runnableAppScheds) { + for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { break; } updateDemandForApp(sched, maxRes); } - for (AppSchedulable sched : nonRunnableAppScheds) { + for (FSAppAttempt sched : nonRunnableApps) { if (Resources.equals(demand, maxRes)) { break; } @@ -170,7 +181,7 @@ public class FSLeafQueue extends FSQueue } } - private void updateDemandForApp(AppSchedulable sched, Resource maxRes) { + private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { sched.updateDemand(); Resource toAdd = sched.getDemand(); if (LOG.isDebugEnabled()) { @@ -194,9 +205,9 @@ public class FSLeafQueue extends FSQueue } Comparator<Schedulable> comparator = policy.getComparator(); - Collections.sort(runnableAppScheds, comparator); - for (AppSchedulable sched : runnableAppScheds) { - if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) { + Collections.sort(runnableApps, comparator); + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; } @@ -209,6 +220,37 @@ public class FSLeafQueue extends FSQueue } @Override + public RMContainer preemptContainer() { + RMContainer toBePreempted = null; + + // If this queue is not over its fair share, reject + if (!preemptContainerPreCheck()) { + return toBePreempted; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + getName() + " is going to preempt a container " + + "from its applications."); + } + + // Choose the app that is most over fair share + Comparator<Schedulable> comparator = policy.getComparator(); + FSAppAttempt candidateSched = null; + for (FSAppAttempt sched : runnableApps) { + if (candidateSched == null || + comparator.compare(sched, candidateSched) > 0) { + candidateSched = sched; + } + } + + // Preempt from the selected app + if (candidateSched != null) { + toBePreempted = candidateSched.preemptContainer(); + } + return toBePreempted; + } + + @Override public List<FSQueue> getChildQueues() { return new ArrayList<FSQueue>(1); } @@ -247,11 +289,52 @@ public class FSLeafQueue extends FSQueue @Override public int getNumRunnableApps() { - return runnableAppScheds.size(); + return runnableApps.size(); } @Override public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + /** + * Check whether this queue can run this application master under the + * maxAMShare limit + * + * @param amResource + * @return true if this queue can run + */ + public boolean canRunAppAM(Resource amResource) { + float maxAMShare = + scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName()); + if (Math.abs(maxAMShare - -1.0f) < 0.0001) { + return true; + } + Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare); + Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); + return !policy + .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource); + } + + public void addAMResourceUsage(Resource amResource) { + if (amResource != null) { + Resources.addTo(amResourceUsage, amResource); + } + } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + } + + /** + * Helper method to check if the queue should preempt containers + * + * @return true if check passes (can preempt) or false otherwise + */ + private boolean preemptContainerPreCheck() { + return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), + getFairShare()); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Tue Aug 19 23:49:39 2014 @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import org.apache.commons.logging.Log; @@ -32,8 +33,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @Private @Unstable @@ -157,6 +161,27 @@ public class FSParentQueue extends FSQue } @Override + public RMContainer preemptContainer() { + RMContainer toBePreempted = null; + + // Find the childQueue which is most over fair share + FSQueue candidateQueue = null; + Comparator<Schedulable> comparator = policy.getComparator(); + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } + } + + // Let the selected queue choose which of its container to preempt + if (candidateQueue != null) { + toBePreempted = candidateQueue.preemptContainer(); + } + return toBePreempted; + } + + @Override public List<FSQueue> getChildQueues() { return childQueues; } @@ -200,4 +225,11 @@ public class FSParentQueue extends FSQue // Should never be called since all applications are submitted to LeafQueues return null; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Tue Aug 19 23:49:39 2014 @@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.util.resou @Private @Unstable -public abstract class FSQueue extends Schedulable implements Queue { +public abstract class FSQueue implements Queue, Schedulable { + private Resource fairShare = Resources.createResource(0, 0); private final String name; protected final FairScheduler scheduler; private final FSQueueMetrics metrics; @@ -119,9 +120,9 @@ public abstract class FSQueue extends Sc // TODO: we might change these queue metrics around a little bit // to match the semantics of the fair scheduler. queueInfo.setCapacity((float) getFairShare().getMemory() / - scheduler.getClusterCapacity().getMemory()); + scheduler.getClusterResource().getMemory()); queueInfo.setCapacity((float) getResourceUsage().getMemory() / - scheduler.getClusterCapacity().getMemory()); + scheduler.getClusterResource().getMemory()); ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>(); if (includeChildQueues) { @@ -139,10 +140,15 @@ public abstract class FSQueue extends Sc public FSQueueMetrics getMetrics() { return metrics; } - + + /** Get the fair share assigned to this Schedulable. */ + public Resource getFairShare() { + return fairShare; + } + @Override public void setFairShare(Resource fairShare) { - super.setFairShare(fairShare); + this.fairShare = fairShare; metrics.setFairShare(fairShare); } @@ -187,4 +193,16 @@ public abstract class FSQueue extends Sc } return true; } + + @Override + public boolean isActive() { + return getNumRunnableApps() > 0; + } + + /** Convenient toString implementation for debugging. */ + @Override + public String toString() { + return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", + getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Tue Aug 19 23:49:39 2014 @@ -18,28 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable @@ -47,208 +35,56 @@ public class FSSchedulerNode extends Sch private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - - private Resource availableResource; - private Resource usedResource = recordFactory.newRecordInstance(Resource.class); - private Resource totalResourceCapability; - - private volatile int numContainers; - - private RMContainer reservedContainer; - private AppSchedulable reservedAppSchedulable; - - /* set of containers that are allocated containers */ - private final Map<ContainerId, RMContainer> launchedContainers = - new HashMap<ContainerId, RMContainer>(); - - private final RMNode rmNode; - private final String nodeName; + private FSAppAttempt reservedAppSchedulable; public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { - this.rmNode = node; - this.availableResource = Resources.clone(node.getTotalCapability()); - totalResourceCapability = - Resource.newInstance(node.getTotalCapability().getMemory(), node - .getTotalCapability().getVirtualCores()); - if (usePortForNodeName) { - nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); - } else { - nodeName = rmNode.getHostName(); - } - } - - public RMNode getRMNode() { - return rmNode; - } - - public NodeId getNodeID() { - return rmNode.getNodeID(); - } - - public String getHttpAddress() { - return rmNode.getHttpAddress(); - } - - @Override - public String getNodeName() { - return nodeName; + super(node, usePortForNodeName); } @Override - public String getRackName() { - return rmNode.getRackName(); - } - - /** - * The Scheduler has allocated containers on this node to the - * given application. - * - * @param applicationId application - * @param rmContainer allocated container - */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { - Container container = rmContainer.getContainer(); - deductAvailableResource(container.getResource()); - ++numContainers; - - launchedContainers.put(container.getId(), rmContainer); - - LOG.info("Assigned container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + - getAvailableResource() + " available"); - } - - @Override - public synchronized Resource getAvailableResource() { - return availableResource; - } - - @Override - public synchronized Resource getUsedResource() { - return usedResource; - } - - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { - return true; - } - return false; - } - - private synchronized void updateResource(Container container) { - addAvailableResource(container.getResource()); - --numContainers; - } - - /** - * Release an allocated container on this node. - * @param container container to be released - */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { - LOG.error("Invalid container released " + container); - return; - } - - /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); - - LOG.info("Released container " + container.getId() + - " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + - ", which currently has " + numContainers + " containers, " + - getUsedResource() + " used and " + getAvailableResource() - + " available" + ", release resources=" + true); - } - - - private synchronized void addAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid resource addition of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.addTo(availableResource, resource); - Resources.subtractFrom(usedResource, resource); - } - - @Override - public Resource getTotalResource() { - return this.totalResourceCapability; - } - - private synchronized void deductAvailableResource(Resource resource) { - if (resource == null) { - LOG.error("Invalid deduction of null resource for " - + rmNode.getNodeAddress()); - return; - } - Resources.subtractFrom(availableResource, resource); - Resources.addTo(usedResource, resource); - } - - @Override - public String toString() { - return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource() + - " used=" + getUsedResource(); - } - - @Override - public int getNumContainers() { - return numContainers; - } - - public synchronized List<RMContainer> getRunningContainers() { - return new ArrayList<RMContainer>(launchedContainers.values()); - } - public synchronized void reserveResource( - FSSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + SchedulerApplicationAttempt application, Priority priority, + RMContainer container) { // Check if it's already reserved - if (this.reservedContainer != null) { + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { // Sanity check - if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) { + if (!container.getContainer().getNodeId().equals(getNodeID())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + - " on node " + reservedContainer.getReservedNode() + - " when currently" + " reserved resource " + this.reservedContainer + - " on node " + this.reservedContainer.getReservedNode()); + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); } // Cannot reserve more than one application on a given node! - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals(container.getContainer().getId().getApplicationAttemptId())) { throw new IllegalStateException("Trying to reserve" + - " container " + reservedContainer + + " container " + container + " for application " + application.getApplicationId() + " when currently" + - " reserved container " + this.reservedContainer + + " reserved container " + reservedContainer + " on node " + this); } LOG.info("Updated reserved container " + - reservedContainer.getContainer().getId() + " on node " + + container.getContainer().getId() + " on node " + this + " for application " + application); } else { - LOG.info("Reserved container " + reservedContainer.getContainer().getId() + + LOG.info("Reserved container " + container.getContainer().getId() + " on node " + this + " for application " + application); } - this.reservedContainer = reservedContainer; - this.reservedAppSchedulable = application.getAppSchedulable(); + setReservedContainer(container); + this.reservedAppSchedulable = (FSAppAttempt) application; } + @Override public synchronized void unreserveResource( - FSSchedulerApp application) { + SchedulerApplicationAttempt application) { // Cannot unreserve for wrong application... ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); + getReservedContainer().getContainer().getId().getApplicationAttemptId(); if (!reservedApplication.equals( application.getApplicationAttemptId())) { throw new IllegalStateException("Trying to unreserve " + @@ -258,22 +94,11 @@ public class FSSchedulerNode extends Sch " on node " + this); } - this.reservedContainer = null; + setReservedContainer(null); this.reservedAppSchedulable = null; } - public synchronized RMContainer getReservedContainer() { - return reservedContainer; - } - - public synchronized AppSchedulable getReservedAppSchedulable() { + public synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } - - @Override - public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { - // we can only adjust available resource if total resource is changed. - Resources.addTo(this.availableResource, deltaResource); - } - }
