Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Sat Aug 16 21:02:21 2014 @@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer { // Tracks the number of running applications by user. private final Map<String, Integer> usersNumRunnableApps; @VisibleForTesting - final ListMultimap<String, AppSchedulable> usersNonRunnableApps; + final ListMultimap<String, FSAppAttempt> usersNonRunnableApps; public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.scheduler = scheduler; @@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer { * Tracks the given new runnable app for purposes of maintaining max running * app limits. */ - public void trackRunnableApp(FSSchedulerApp app) { + public void trackRunnableApp(FSAppAttempt app) { String user = app.getUser(); FSLeafQueue queue = app.getQueue(); // Increment running counts for all parent queues @@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer { * Tracks the given new non runnable app so that it can be made runnable when * it would not violate max running app limits. */ - public void trackNonRunnableApp(FSSchedulerApp app) { + public void trackNonRunnableApp(FSAppAttempt app) { String user = app.getUser(); - usersNonRunnableApps.put(user, app.getAppSchedulable()); + usersNonRunnableApps.put(user, app); } /** @@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer { * Runs in O(n log(n)) where n is the number of queues that are under the * highest queue that went from having no slack to having slack. */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { + public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); // childqueueX might have no pending apps itself, but if a queue higher up @@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer { parent = parent.getParent(); } - List<List<AppSchedulable>> appsNowMaybeRunnable = - new ArrayList<List<AppSchedulable>>(); + List<List<FSAppAttempt>> appsNowMaybeRunnable = + new ArrayList<List<FSAppAttempt>>(); // Compile lists of apps which may now be runnable // We gather lists instead of building a set of all non-runnable apps so @@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer { userNumRunning = 0; } if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { - List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user); + List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user); if (userWaitingApps != null) { appsNowMaybeRunnable.add(userWaitingApps); } } // Scan through and check whether this means that any apps are now runnable - Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator( + Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); - FSSchedulerApp prev = null; - List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>(); + FSAppAttempt prev = null; + List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>(); while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); + FSAppAttempt next = iter.next(); if (next == prev) { continue; } if (canAppBeRunnable(next.getQueue(), next.getUser())) { trackRunnableApp(next); - AppSchedulable appSched = next.getAppSchedulable(); + FSAppAttempt appSched = next; next.getQueue().getRunnableAppSchedulables().add(appSched); noLongerPendingApps.add(appSched); @@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer { // We remove the apps from their pending lists afterwards so that we don't // pull them out from under the iterator. If they are not in these lists // in the first place, there is a bug. - for (AppSchedulable appSched : noLongerPendingApps) { - if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() + for (FSAppAttempt appSched : noLongerPendingApps) { + if (!appSched.getQueue().getNonRunnableAppSchedulables() .remove(appSched)) { LOG.error("Can't make app runnable that does not already exist in queue" + " as non-runnable: " + appSched + ". This should never happen."); } - if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { + if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " + "usersNonRunnableApps, but was not. This should never happen."); } @@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer { * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. */ - public void untrackRunnableApp(FSSchedulerApp app) { + public void untrackRunnableApp(FSAppAttempt app) { // Update usersRunnableApps String user = app.getUser(); int newUserNumRunning = usersNumRunnableApps.get(user) - 1; @@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer { /** * Stops tracking the given non-runnable app */ - public void untrackNonRunnableApp(FSSchedulerApp app) { - usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + public void untrackNonRunnableApp(FSAppAttempt app) { + usersNonRunnableApps.remove(app.getUser(), app); } /** @@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer { * of non-runnable applications. */ private void gatherPossiblyRunnableAppLists(FSQueue queue, - List<List<AppSchedulable>> appLists) { + List<List<FSAppAttempt>> appLists) { if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() .getQueueMaxApps(queue.getName())) { if (queue instanceof FSLeafQueue) { @@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer { * of O(num lists) time. */ static class MultiListStartTimeIterator implements - Iterator<FSSchedulerApp> { + Iterator<FSAppAttempt> { - private List<AppSchedulable>[] appLists; + private List<FSAppAttempt>[] appLists; private int[] curPositionsInAppLists; private PriorityQueue<IndexAndTime> appListsByCurStartTime; @SuppressWarnings("unchecked") - public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) { + public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) { appLists = appListList.toArray(new List[appListList.size()]); curPositionsInAppLists = new int[appLists.length]; appListsByCurStartTime = new PriorityQueue<IndexAndTime>(); @@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer { } @Override - public FSSchedulerApp next() { + public FSAppAttempt next() { IndexAndTime indexAndTime = appListsByCurStartTime.remove(); int nextListIndex = indexAndTime.index; - AppSchedulable next = appLists[nextListIndex] + FSAppAttempt next = appLists[nextListIndex] .get(curPositionsInAppLists[nextListIndex]); curPositionsInAppLists[nextListIndex]++; @@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer { } appListsByCurStartTime.add(indexAndTime); - return next.getApp(); + return next; } @Override
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Sat Aug 16 21:02:21 2014 @@ -48,7 +48,7 @@ public class NewAppWeightBooster extends super.setConf(conf); } - public double adjustWeight(AppSchedulable app, double curWeight) { + public double adjustWeight(FSAppAttempt app, double curWeight) { long start = app.getStartTime(); long now = System.currentTimeMillis(); if (now - start < duration) { Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Sat Aug 16 21:02:21 2014 @@ -27,20 +27,14 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.util.resource.Resources; /** - * A Schedulable represents an entity that can launch tasks, such as a job - * or a queue. It provides a common interface so that algorithms such as fair - * sharing can be applied both within a queue and across queues. There are - * currently two types of Schedulables: JobSchedulables, which represent a - * single job, and QueueSchedulables, which allocate among jobs in their queue. - * - * Separate sets of Schedulables are used for maps and reduces. Each queue has - * both a mapSchedulable and a reduceSchedulable, and so does each job. + * A Schedulable represents an entity that can be scheduled such as an + * application or a queue. It provides a common interface so that algorithms + * such as fair sharing can be applied both within a queue and across queues. * * A Schedulable is responsible for three roles: - * 1) It can launch tasks through assignTask(). - * 2) It provides information about the job/queue to the scheduler, including: + * 1) Assign resources through {@link #assignContainer}. + * 2) It provides information about the app/queue to the scheduler, including: * - Demand (maximum number of tasks required) - * - Number of currently running tasks * - Minimum share (for queues) * - Job/queue weight (for fair sharing) * - Start time and priority (for FIFO) @@ -57,81 +51,61 @@ import org.apache.hadoop.yarn.util.resou */ @Private @Unstable -public abstract class Schedulable { - /** Fair share assigned to this Schedulable */ - private Resource fairShare = Resources.createResource(0); - +public interface Schedulable { /** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */ - public abstract String getName(); + public String getName(); /** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */ - public abstract Resource getDemand(); + public Resource getDemand(); /** Get the aggregate amount of resources consumed by the schedulable. */ - public abstract Resource getResourceUsage(); + public Resource getResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ - public abstract Resource getMinShare(); + public Resource getMinShare(); /** Maximum Resource share assigned to the schedulable. */ - public abstract Resource getMaxShare(); + public Resource getMaxShare(); /** Job/queue weight in fair sharing. */ - public abstract ResourceWeights getWeights(); + public ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ - public abstract long getStartTime(); + public long getStartTime(); /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */ - public abstract Priority getPriority(); + public Priority getPriority(); /** Refresh the Schedulable's demand and those of its children if any. */ - public abstract void updateDemand(); + public void updateDemand(); /** * Assign a container on this node if possible, and return the amount of * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node); + public Resource assignContainer(FSSchedulerNode node); /** * Preempt a container from this Schedulable if possible. */ - public abstract RMContainer preemptContainer(); - - /** Assign a fair share to this Schedulable. */ - public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - } + public RMContainer preemptContainer(); /** Get the fair share assigned to this Schedulable. */ - public Resource getFairShare() { - return fairShare; - } + public Resource getFairShare(); + + /** Assign a fair share to this Schedulable. */ + public void setFairShare(Resource fairShare); /** * Returns true if queue has atleast one app running. Always returns true for * AppSchedulables. */ - public boolean isActive() { - if (this instanceof FSQueue) { - FSQueue queue = (FSQueue) this; - return queue.getNumRunnableApps() > 0; - } - return true; - } - - /** Convenient toString implementation for debugging. */ - @Override - public String toString() { - return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); - } + public boolean isActive(); } Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Sat Aug 16 21:02:21 2014 @@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurab @Private @Unstable public interface WeightAdjuster { - public double adjustWeight(AppSchedulable app, double curWeight); + public double adjustWeight(FSAppAttempt app, double curWeight); } Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Sat Aug 16 21:02:21 2014 @@ -46,8 +46,7 @@ public class FairSchedulerInfo extends S } public int getAppFairShare(ApplicationAttemptId appAttemptId) { - return scheduler.getSchedulerApp(appAttemptId). - getAppSchedulable().getFairShare().getMemory(); + return scheduler.getSchedulerApp(appAttemptId).getFairShare().getMemory(); } public FairSchedulerQueueInfo getRootQueueInfo() { Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java Sat Aug 16 21:02:21 2014 @@ -24,7 +24,8 @@ import javax.xml.bind.annotation.XmlAcce import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; @@ -39,9 +40,9 @@ public class FairSchedulerLeafQueueInfo public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) { super(queue, scheduler); - Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables(); - for (AppSchedulable app : apps) { - if (app.getApp().isPending()) { + Collection<FSAppAttempt> apps = queue.getRunnableAppSchedulables(); + for (FSAppAttempt app : apps) { + if (app.isPending()) { numPendingApps++; } else { numActiveApps++; Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Sat Aug 16 21:02:21 2014 @@ -27,7 +27,10 @@ import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.lib.StaticUserWebFilter; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.AuthenticationFilterInitializer; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,8 +42,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -235,4 +240,75 @@ public class TestResourceManager { } } + @Test(timeout = 50000) + public void testFilterOverrides() throws Exception { + String filterInitializerConfKey = "hadoop.http.filter.initializers"; + String[] filterInitializers = + { + AuthenticationFilterInitializer.class.getName(), + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + "," + + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + this.getClass().getName() }; + for (String filterInitializer : filterInitializers) { + resourceManager = new ResourceManager(); + Configuration conf = new YarnConfiguration(); + conf.set(filterInitializerConfKey, filterInitializer); + conf.set("hadoop.security.authentication", "kerberos"); + conf.set("hadoop.http.authentication.type", "kerberos"); + try { + try { + UserGroupInformation.setConfiguration(conf); + } catch (Exception e) { + // ignore we just care about getting true for + // isSecurityEnabled() + LOG.info("Got expected exception"); + } + resourceManager.init(conf); + resourceManager.startWepApp(); + } catch (RuntimeException e) { + // Exceptions are expected because we didn't setup everything + // just want to test filter settings + String tmp = resourceManager.getConfig().get(filterInitializerConfKey); + if (filterInitializer.contains(this.getClass().getName())) { + Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName() + + "," + this.getClass().getName(), tmp); + } else { + Assert.assertEquals( + RMAuthenticationFilterInitializer.class.getName(), tmp); + } + resourceManager.stop(); + } + } + + // simple mode overrides + String[] simpleFilterInitializers = + { "", StaticUserWebFilter.class.getName() }; + for (String filterInitializer : simpleFilterInitializers) { + resourceManager = new ResourceManager(); + Configuration conf = new YarnConfiguration(); + conf.set(filterInitializerConfKey, filterInitializer); + try { + UserGroupInformation.setConfiguration(conf); + resourceManager.init(conf); + resourceManager.startWepApp(); + } catch (RuntimeException e) { + // Exceptions are expected because we didn't setup everything + // just want to test filter settings + String tmp = resourceManager.getConfig().get(filterInitializerConfKey); + if (filterInitializer.equals(StaticUserWebFilter.class.getName())) { + Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName() + + "," + StaticUserWebFilter.class.getName(), tmp); + } else { + Assert.assertEquals( + RMAuthenticationFilterInitializer.class.getName(), tmp); + } + resourceManager.stop(); + } + } + } + } Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Sat Aug 16 21:02:21 2014 @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -107,7 +108,7 @@ public class TestWorkPreservingRMRestart @Parameterized.Parameters public static Collection<Object[]> getTestParameters() { return Arrays.asList(new Object[][] { { CapacityScheduler.class }, - { FifoScheduler.class } }); + { FifoScheduler.class }, {FairScheduler.class } }); } public TestWorkPreservingRMRestart(Class<?> schedulerClass) { @@ -224,7 +225,11 @@ public class TestWorkPreservingRMRestart assertTrue(schedulerAttempt.getLiveContainers().contains( scheduler.getRMContainer(runningContainer.getContainerId()))); assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); - assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // Until YARN-1959 is resolved + if (scheduler.getClass() != FairScheduler.class) { + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + } // *********** check appSchedulingInfo state *********** assertEquals((1 << 22) + 1, schedulerAttempt.getNewContainerId()); Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Aug 16 21:02:21 2014 @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -100,6 +102,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -1014,4 +1020,874 @@ public class TestCapacityScheduler { // Now with updated ResourceRequest, a container is allocated for AM. Assert.assertTrue(containers.size() == 1); } + + private MockRM setUpMove() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + return rm; + } + + @Test + public void testMoveAppBasic() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "b1"); + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAppSameParent() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2"); + assertTrue(appsInA2.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "a2"); + + // check postconditions + appsInA2 = scheduler.getAppsInQueue("a2"); + assertEquals(1, appsInA2.size()); + queue = + scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a2")); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + + @Test + public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 task_1_0 allocated, used=2G + nodeUpdate(nm_0); + + // nothing allocated + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(1 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G + // available + checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + + // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% + // total cap) + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + // 2GB 1C + Task task_1_1 = + new Task(application_1, priority_0, + new String[] { ResourceRequest.ANY }); + application_1.addTask(task_1_1); + + application_1.schedule(); + + // 2GB 1C + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0, host_1 }); + application_0.addTask(task_0_1); + + application_0.schedule(); + + // prev 2G used free 2G + nodeUpdate(nm_0); + + // prev 0G used free 2G + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + + // Get allocations from the scheduler + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(2 * GB, nm_1); + + } + + @Test + public void testMoveAppSuccess() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // b2 can only run 1 app at a time + scheduler.moveApplication(application_0.getApplicationId(), "b2"); + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(0 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + // lets move application_0 to a queue where it can run + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + application_0.schedule(); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + + } + + @Test(expected = YarnException.class) + public void testMoveAppViolateQueueState() throws Exception { + + resourceManager = new ResourceManager(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + StringBuilder qState = new StringBuilder(); + qState.append(CapacitySchedulerConfiguration.PREFIX).append(B) + .append(CapacitySchedulerConfiguration.DOT) + .append(CapacitySchedulerConfiguration.STATE); + csConf.set(qState.toString(), QueueState.STOPPED.name()); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + mockContext = mock(RMContext.class); + when(mockContext.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(6 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(3 * GB, nm_0); + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + } + + @Test + public void testMoveAppQueueMetricsCheck() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + CapacityScheduler cs = + (CapacityScheduler) resourceManager.getResourceScheduler(); + CSQueue origRootQ = cs.getRootQueue(); + CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ); + int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); + int origNumAppsRoot = origRootQ.getNumApplications(); + + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + + CSQueue newRootQ = cs.getRootQueue(); + int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); + int newNumAppsRoot = newRootQ.getNumApplications(); + CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ); + CapacitySchedulerLeafQueueInfo origOldA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo origNewA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetOldA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetNewA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues()); + // originally submitted here + assertEquals(1, origOldA1.getNumApplications()); + assertEquals(1, origNumAppsA); + assertEquals(2, origNumAppsRoot); + // after the move + assertEquals(0, origNewA1.getNumApplications()); + assertEquals(1, newNumAppsA); + assertEquals(2, newNumAppsRoot); + // original consumption on a1 + assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory()); + assertEquals(1, origOldA1.getResourcesUsed().getvCores()); + assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move + assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move + // app moved here with live containers + assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory()); + assertEquals(1, targetNewA2.getResourcesUsed().getvCores()); + // it was empty before the move + assertEquals(0, targetOldA2.getNumApplications()); + assertEquals(0, targetOldA2.getResourcesUsed().getMemory()); + assertEquals(0, targetOldA2.getResourcesUsed().getvCores()); + // after the app moved here + assertEquals(1, targetNewA2.getNumApplications()); + // 1 container on original queue before move + assertEquals(1, origOldA1.getNumContainers()); + // after the move the resource released + assertEquals(0, origNewA1.getNumContainers()); + // and moved to the new queue + assertEquals(1, targetNewA2.getNumContainers()); + // which originally didn't have any + assertEquals(0, targetOldA2.getNumContainers()); + // 1 user with 3GB + assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + // user ha no more running app in the orig queue + assertEquals(0, origNewA1.getUsers().getUsersList().size()); + // 1 user with 3GB + assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + } + + private int getNumAppsInQueue(String name, List<CSQueue> queues) { + for (CSQueue queue : queues) { + if (queue.getQueueName().equals(name)) { + return queue.getNumApplications(); + } + } + return -1; + } + + private CapacitySchedulerQueueInfo getQueueInfo(String name, + CapacitySchedulerQueueInfoList info) { + if (info != null) { + for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) { + if (queueInfo.getQueueName().equals(name)) { + return queueInfo; + } else { + CapacitySchedulerQueueInfo result = + getQueueInfo(name, queueInfo.getQueues()); + if (result == null) { + continue; + } + return result; + } + } + } + return null; + } + + @Test + public void testMoveAllApps() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + // check postconditions + Thread.sleep(1000); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidDestination() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("a1", "DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("DOES_NOT_EXIST", "b1"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInQueue() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + scheduler.killAllAppsInQueue("a1"); + + // check postconditions + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.isEmpty()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + try { + scheduler.killAllAppsInQueue("DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + } Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Sat Aug 16 21:02:21 2014 @@ -28,10 +28,11 @@ import org.apache.hadoop.yarn.util.resou /** * Dummy implementation of Schedulable for unit testing. */ -public class FakeSchedulable extends Schedulable { +public class FakeSchedulable implements Schedulable { private Resource usage; private Resource minShare; private Resource maxShare; + private Resource fairShare; private ResourceWeights weights; private Priority priority; private long startTime; @@ -90,6 +91,21 @@ public class FakeSchedulable extends Sch } @Override + public Resource getFairShare() { + return this.fairShare; + } + + @Override + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } + + @Override + public boolean isActive() { + return true; + } + + @Override public Resource getDemand() { return null; } Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Sat Aug 16 21:02:21 2014 @@ -62,7 +62,7 @@ public class TestFSLeafQueue { @Test public void testUpdateDemand() { - AppSchedulable app = mock(AppSchedulable.class); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); schedulable.addAppSchedulable(app); Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1618417&r1=1618416&r2=1618417&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Sat Aug 16 21:02:21 2014 @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -82,13 +81,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -1539,7 +1536,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); ContainerId containerId = scheduler.getSchedulerApp(attId) @@ -1613,9 +1610,9 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @@ -1688,8 +1685,8 @@ public class TestFairScheduler extends F "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1731,7 +1728,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1766,7 +1763,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(0, 1, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1830,10 +1827,10 @@ public class TestFairScheduler extends F ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1952,7 +1949,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -2025,7 +2022,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2066,7 +2063,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2101,7 +2098,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -2143,7 +2140,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2165,10 +2162,10 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2208,13 +2205,13 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2247,19 +2244,19 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2341,7 +2338,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2353,14 +2350,14 @@ public class TestFairScheduler extends F } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); - Collection<AppSchedulable> runnableApps = + Collection<FSAppAttempt> runnableApps = queue.getRunnableAppSchedulables(); - Collection<AppSchedulable> nonRunnableApps = + Collection<FSAppAttempt> nonRunnableApps = queue.getNonRunnableAppSchedulables(); - assertEquals(runnable, runnableApps.contains(app.getAppSchedulable())); - assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable())); + assertEquals(runnable, runnableApps.contains(app)); + assertEquals(!runnable, nonRunnableApps.contains(app)); } private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue, @@ -2465,7 +2462,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 1024 MB memory", @@ -2479,7 +2476,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 1024 MB memory", @@ -2493,7 +2490,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId3 = createAppAttemptId(3, 1); createApplicationWithAMResource(attId3, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application3's AM requests 1024 MB memory", @@ -2529,7 +2526,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId4 = createAppAttemptId(4, 1); createApplicationWithAMResource(attId4, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application4's AM requests 2048 MB memory", @@ -2543,7 +2540,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId5 = createAppAttemptId(5, 1); createApplicationWithAMResource(attId5, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5); - FSSchedulerApp app5 = scheduler.getSchedulerApp(attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application5's AM requests 2048 MB memory", @@ -2586,7 +2583,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId6 = createAppAttemptId(6, 1); createApplicationWithAMResource(attId6, "queue1", "user1", amResource3); createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6); - FSSchedulerApp app6 = scheduler.getSchedulerApp(attId6); + FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", @@ -2677,7 +2674,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 2048 MB memory", @@ -2691,7 +2688,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 2048 MB memory", @@ -2823,7 +2820,7 @@ public class TestFairScheduler extends F // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSAppAttempt app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -3007,7 +3004,7 @@ public class TestFairScheduler extends F assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -3063,7 +3060,7 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), @@ -3171,12 +3168,10 @@ public class TestFairScheduler extends F assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); scheduler.moveApplication(appId, "queue2"); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); assertSame(targetQueue, app.getQueue()); - assertFalse(oldQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); @@ -3224,17 +3219,13 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId = createSchedulingRequest(1024, 1, "queue1", "user1", 3); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); - assertTrue(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); + assertTrue(oldQueue.getNonRunnableAppSchedulables().contains(app)); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); - assertFalse(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertFalse(targetQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getNonRunnableAppSchedulables().contains(app)); + assertFalse(targetQueue.getNonRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); }
