Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Mon Aug 18 18:41:31 2014 @@ -35,7 +35,7 @@ public class FSSchedulerNode extends Sch private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private AppSchedulable reservedAppSchedulable; + private FSAppAttempt reservedAppSchedulable; public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); @@ -76,7 +76,7 @@ public class FSSchedulerNode extends Sch " on node " + this + " for application " + application); } setReservedContainer(container); - this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable(); + this.reservedAppSchedulable = (FSAppAttempt) application; } @Override @@ -98,7 +98,7 @@ public class FSSchedulerNode extends Sch this.reservedAppSchedulable = null; } - public synchronized AppSchedulable getReservedAppSchedulable() { + public synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } }
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Aug 18 18:41:31 2014 @@ -117,7 +117,7 @@ import com.google.common.base.Preconditi @Unstable @SuppressWarnings("unchecked") public class FairScheduler extends - AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> { + AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> { private FairSchedulerConfiguration conf; private Resource incrAllocation; @@ -432,8 +432,8 @@ public class FairScheduler extends try { // Reset preemptedResource for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - for (AppSchedulable app : queue.getRunnableAppSchedulables()) { - app.getApp().resetPreemptedResources(); + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.resetPreemptedResources(); } } @@ -453,8 +453,8 @@ public class FairScheduler extends } finally { // Clear preemptedResources for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - for (AppSchedulable app : queue.getRunnableAppSchedulables()) { - app.getApp().clearPreemptedResources(); + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.clearPreemptedResources(); } } } @@ -465,7 +465,7 @@ public class FairScheduler extends protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSSchedulerApp app = getSchedulerApp(appAttemptId); + FSAppAttempt app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + @@ -490,7 +490,7 @@ public class FairScheduler extends (getClock().getTime() - time) + "ms)"); } } else { - // track the request in the FSSchedulerApp itself + // track the request in the FSAppAttempt itself app.addPreemption(container, getClock().getTime()); } } @@ -541,7 +541,7 @@ public class FairScheduler extends } // synchronized for sizeBasedWeight - public synchronized ResourceWeights getAppWeight(AppSchedulable app) { + public synchronized ResourceWeights getAppWeight(FSAppAttempt app) { double weight = 1.0; if (sizeBasedWeight) { // Set weight based on current memory demand @@ -636,8 +636,8 @@ public class FairScheduler extends return; } - SchedulerApplication<FSSchedulerApp> application = - new SchedulerApplication<FSSchedulerApp>(queue, user); + SchedulerApplication<FSAppAttempt> application = + new SchedulerApplication<FSAppAttempt>(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); @@ -661,13 +661,13 @@ public class FairScheduler extends ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { - SchedulerApplication<FSSchedulerApp> application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - FSSchedulerApp attempt = - new FSSchedulerApp(applicationAttemptId, user, + FSAppAttempt attempt = + new FSAppAttempt(this, applicationAttemptId, user, queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); if (transferStateFromPreviousAttempt) { @@ -742,7 +742,7 @@ public class FairScheduler extends private synchronized void removeApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication<FSSchedulerApp> application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); @@ -757,9 +757,9 @@ public class FairScheduler extends RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApplication<FSSchedulerApp> application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); - FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); + FSAppAttempt attempt = getSchedulerApp(applicationAttemptId); if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -820,7 +820,7 @@ public class FairScheduler extends Container container = rmContainer.getContainer(); // Get the application for the finished container - FSSchedulerApp application = + FSAppAttempt application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); @@ -835,8 +835,7 @@ public class FairScheduler extends FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); + application.unreserve(rmContainer.getReservedPriority(), node); } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); @@ -896,7 +895,7 @@ public class FairScheduler extends List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = getSchedulerApp(appAttemptId); + FSAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -1066,13 +1065,13 @@ public class FairScheduler extends // 1. Check for reserved applications // 2. Schedule if there are no reservations - AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node " + node); reservedAppSchedulable.unreserve(reservedPriority, node); reservedAppSchedulable = null; @@ -1080,7 +1079,7 @@ public class FairScheduler extends // Reservation exists; try to fulfill the reservation if (LOG.isDebugEnabled()) { LOG.debug("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } @@ -1105,8 +1104,8 @@ public class FairScheduler extends updateRootQueueMetrics(); } - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId); + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { + return super.getApplicationAttempt(appAttemptId); } /** @@ -1151,6 +1150,8 @@ public class FairScheduler extends } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { @@ -1268,8 +1269,8 @@ public class FairScheduler extends fsOpDurations = FSOpDurations.getInstance(true); // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>(); + this.applications = new ConcurrentHashMap< + ApplicationId, SchedulerApplication<FSAppAttempt>>(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1369,7 +1370,7 @@ public class FairScheduler extends @Override public List<QueueUserACLInfo> getQueueUserAclInfo() { - UserGroupInformation user = null; + UserGroupInformation user; try { user = UserGroupInformation.getCurrentUser(); } catch (IOException ioe) { @@ -1431,11 +1432,11 @@ public class FairScheduler extends @Override public synchronized String moveApplication(ApplicationId appId, String queueName) throws YarnException { - SchedulerApplication<FSSchedulerApp> app = applications.get(appId); + SchedulerApplication<FSAppAttempt> app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); } - FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt(); + FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); // To serialize with FairScheduler#allocate, synchronize on app attempt synchronized (attempt) { FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); @@ -1448,8 +1449,7 @@ public class FairScheduler extends return oldQueue.getQueueName(); } - if (oldQueue.getRunnableAppSchedulables().contains( - attempt.getAppSchedulable())) { + if (oldQueue.getRunnableAppSchedulables().contains(attempt)) { verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); } @@ -1458,7 +1458,7 @@ public class FairScheduler extends } } - private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app, + private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { String queueName = targetQueue.getQueueName(); ApplicationAttemptId appAttId = app.getApplicationAttemptId(); @@ -1495,8 +1495,8 @@ public class FairScheduler extends * Helper for moveApplication, which has appropriate synchronization, so all * operations will be atomic. */ - private void executeMove(SchedulerApplication<FSSchedulerApp> app, - FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { + private void executeMove(SchedulerApplication<FSAppAttempt> app, + FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Mon Aug 18 18:41:31 2014 @@ -25,15 +25,15 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability.Unstable; /** - * Order {@link AppSchedulable} objects by priority and then by submit time, as + * Order {@link FSAppAttempt} objects by priority and then by submit time, as * in the default scheduler in Hadoop. */ @Private @Unstable -public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable { +public class FifoAppComparator implements Comparator<FSAppAttempt>, Serializable { private static final long serialVersionUID = 3428835083489547918L; - public int compare(AppSchedulable a1, AppSchedulable a2) { + public int compare(FSAppAttempt a1, FSAppAttempt a2) { int res = a1.getPriority().compareTo(a2.getPriority()); if (res == 0) { if (a1.getStartTime() < a2.getStartTime()) { @@ -44,7 +44,7 @@ public class FifoAppComparator implement } if (res == 0) { // If there is a tie, break it by app ID to get a deterministic order - res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId()); + res = a1.getApplicationId().compareTo(a2.getApplicationId()); } return res; } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Mon Aug 18 18:41:31 2014 @@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer { // Tracks the number of running applications by user. private final Map<String, Integer> usersNumRunnableApps; @VisibleForTesting - final ListMultimap<String, AppSchedulable> usersNonRunnableApps; + final ListMultimap<String, FSAppAttempt> usersNonRunnableApps; public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.scheduler = scheduler; @@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer { * Tracks the given new runnable app for purposes of maintaining max running * app limits. */ - public void trackRunnableApp(FSSchedulerApp app) { + public void trackRunnableApp(FSAppAttempt app) { String user = app.getUser(); FSLeafQueue queue = app.getQueue(); // Increment running counts for all parent queues @@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer { * Tracks the given new non runnable app so that it can be made runnable when * it would not violate max running app limits. */ - public void trackNonRunnableApp(FSSchedulerApp app) { + public void trackNonRunnableApp(FSAppAttempt app) { String user = app.getUser(); - usersNonRunnableApps.put(user, app.getAppSchedulable()); + usersNonRunnableApps.put(user, app); } /** @@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer { * Runs in O(n log(n)) where n is the number of queues that are under the * highest queue that went from having no slack to having slack. */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { + public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); // childqueueX might have no pending apps itself, but if a queue higher up @@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer { parent = parent.getParent(); } - List<List<AppSchedulable>> appsNowMaybeRunnable = - new ArrayList<List<AppSchedulable>>(); + List<List<FSAppAttempt>> appsNowMaybeRunnable = + new ArrayList<List<FSAppAttempt>>(); // Compile lists of apps which may now be runnable // We gather lists instead of building a set of all non-runnable apps so @@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer { userNumRunning = 0; } if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { - List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user); + List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user); if (userWaitingApps != null) { appsNowMaybeRunnable.add(userWaitingApps); } } // Scan through and check whether this means that any apps are now runnable - Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator( + Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); - FSSchedulerApp prev = null; - List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>(); + FSAppAttempt prev = null; + List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>(); while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); + FSAppAttempt next = iter.next(); if (next == prev) { continue; } if (canAppBeRunnable(next.getQueue(), next.getUser())) { trackRunnableApp(next); - AppSchedulable appSched = next.getAppSchedulable(); + FSAppAttempt appSched = next; next.getQueue().getRunnableAppSchedulables().add(appSched); noLongerPendingApps.add(appSched); @@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer { // We remove the apps from their pending lists afterwards so that we don't // pull them out from under the iterator. If they are not in these lists // in the first place, there is a bug. - for (AppSchedulable appSched : noLongerPendingApps) { - if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() + for (FSAppAttempt appSched : noLongerPendingApps) { + if (!appSched.getQueue().getNonRunnableAppSchedulables() .remove(appSched)) { LOG.error("Can't make app runnable that does not already exist in queue" + " as non-runnable: " + appSched + ". This should never happen."); } - if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { + if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " + "usersNonRunnableApps, but was not. This should never happen."); } @@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer { * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. */ - public void untrackRunnableApp(FSSchedulerApp app) { + public void untrackRunnableApp(FSAppAttempt app) { // Update usersRunnableApps String user = app.getUser(); int newUserNumRunning = usersNumRunnableApps.get(user) - 1; @@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer { /** * Stops tracking the given non-runnable app */ - public void untrackNonRunnableApp(FSSchedulerApp app) { - usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + public void untrackNonRunnableApp(FSAppAttempt app) { + usersNonRunnableApps.remove(app.getUser(), app); } /** @@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer { * of non-runnable applications. */ private void gatherPossiblyRunnableAppLists(FSQueue queue, - List<List<AppSchedulable>> appLists) { + List<List<FSAppAttempt>> appLists) { if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() .getQueueMaxApps(queue.getName())) { if (queue instanceof FSLeafQueue) { @@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer { * of O(num lists) time. */ static class MultiListStartTimeIterator implements - Iterator<FSSchedulerApp> { + Iterator<FSAppAttempt> { - private List<AppSchedulable>[] appLists; + private List<FSAppAttempt>[] appLists; private int[] curPositionsInAppLists; private PriorityQueue<IndexAndTime> appListsByCurStartTime; @SuppressWarnings("unchecked") - public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) { + public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) { appLists = appListList.toArray(new List[appListList.size()]); curPositionsInAppLists = new int[appLists.length]; appListsByCurStartTime = new PriorityQueue<IndexAndTime>(); @@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer { } @Override - public FSSchedulerApp next() { + public FSAppAttempt next() { IndexAndTime indexAndTime = appListsByCurStartTime.remove(); int nextListIndex = indexAndTime.index; - AppSchedulable next = appLists[nextListIndex] + FSAppAttempt next = appLists[nextListIndex] .get(curPositionsInAppLists[nextListIndex]); curPositionsInAppLists[nextListIndex]++; @@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer { } appListsByCurStartTime.add(indexAndTime); - return next.getApp(); + return next; } @Override Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Mon Aug 18 18:41:31 2014 @@ -48,7 +48,7 @@ public class NewAppWeightBooster extends super.setConf(conf); } - public double adjustWeight(AppSchedulable app, double curWeight) { + public double adjustWeight(FSAppAttempt app, double curWeight) { long start = app.getStartTime(); long now = System.currentTimeMillis(); if (now - start < duration) { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Aug 18 18:41:31 2014 @@ -27,20 +27,14 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.util.resource.Resources; /** - * A Schedulable represents an entity that can launch tasks, such as a job - * or a queue. It provides a common interface so that algorithms such as fair - * sharing can be applied both within a queue and across queues. There are - * currently two types of Schedulables: JobSchedulables, which represent a - * single job, and QueueSchedulables, which allocate among jobs in their queue. - * - * Separate sets of Schedulables are used for maps and reduces. Each queue has - * both a mapSchedulable and a reduceSchedulable, and so does each job. + * A Schedulable represents an entity that can be scheduled such as an + * application or a queue. It provides a common interface so that algorithms + * such as fair sharing can be applied both within a queue and across queues. * * A Schedulable is responsible for three roles: - * 1) It can launch tasks through assignTask(). - * 2) It provides information about the job/queue to the scheduler, including: + * 1) Assign resources through {@link #assignContainer}. + * 2) It provides information about the app/queue to the scheduler, including: * - Demand (maximum number of tasks required) - * - Number of currently running tasks * - Minimum share (for queues) * - Job/queue weight (for fair sharing) * - Start time and priority (for FIFO) @@ -57,81 +51,61 @@ import org.apache.hadoop.yarn.util.resou */ @Private @Unstable -public abstract class Schedulable { - /** Fair share assigned to this Schedulable */ - private Resource fairShare = Resources.createResource(0); - +public interface Schedulable { /** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */ - public abstract String getName(); + public String getName(); /** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */ - public abstract Resource getDemand(); + public Resource getDemand(); /** Get the aggregate amount of resources consumed by the schedulable. */ - public abstract Resource getResourceUsage(); + public Resource getResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ - public abstract Resource getMinShare(); + public Resource getMinShare(); /** Maximum Resource share assigned to the schedulable. */ - public abstract Resource getMaxShare(); + public Resource getMaxShare(); /** Job/queue weight in fair sharing. */ - public abstract ResourceWeights getWeights(); + public ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ - public abstract long getStartTime(); + public long getStartTime(); /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */ - public abstract Priority getPriority(); + public Priority getPriority(); /** Refresh the Schedulable's demand and those of its children if any. */ - public abstract void updateDemand(); + public void updateDemand(); /** * Assign a container on this node if possible, and return the amount of * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node); + public Resource assignContainer(FSSchedulerNode node); /** * Preempt a container from this Schedulable if possible. */ - public abstract RMContainer preemptContainer(); - - /** Assign a fair share to this Schedulable. */ - public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - } + public RMContainer preemptContainer(); /** Get the fair share assigned to this Schedulable. */ - public Resource getFairShare() { - return fairShare; - } + public Resource getFairShare(); + + /** Assign a fair share to this Schedulable. */ + public void setFairShare(Resource fairShare); /** * Returns true if queue has atleast one app running. Always returns true for * AppSchedulables. */ - public boolean isActive() { - if (this instanceof FSQueue) { - FSQueue queue = (FSQueue) this; - return queue.getNumRunnableApps() > 0; - } - return true; - } - - /** Convenient toString implementation for debugging. */ - @Override - public String toString() { - return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); - } + public boolean isActive(); } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Mon Aug 18 18:41:31 2014 @@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurab @Private @Unstable public interface WeightAdjuster { - public double adjustWeight(AppSchedulable app, double curWeight); + public double adjustWeight(FSAppAttempt app, double curWeight); } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Mon Aug 18 18:41:31 2014 @@ -46,8 +46,7 @@ public class FairSchedulerInfo extends S } public int getAppFairShare(ApplicationAttemptId appAttemptId) { - return scheduler.getSchedulerApp(appAttemptId). - getAppSchedulable().getFairShare().getMemory(); + return scheduler.getSchedulerApp(appAttemptId).getFairShare().getMemory(); } public FairSchedulerQueueInfo getRootQueueInfo() { Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java Mon Aug 18 18:41:31 2014 @@ -24,7 +24,8 @@ import javax.xml.bind.annotation.XmlAcce import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; @@ -39,9 +40,9 @@ public class FairSchedulerLeafQueueInfo public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) { super(queue, scheduler); - Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables(); - for (AppSchedulable app : apps) { - if (app.getApp().isPending()) { + Collection<FSAppAttempt> apps = queue.getRunnableAppSchedulables(); + for (FSAppAttempt app : apps) { + if (app.isPending()) { numPendingApps++; } else { numActiveApps++; Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Mon Aug 18 18:41:31 2014 @@ -27,7 +27,10 @@ import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.lib.StaticUserWebFilter; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.AuthenticationFilterInitializer; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,8 +42,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -235,4 +240,75 @@ public class TestResourceManager { } } + @Test(timeout = 50000) + public void testFilterOverrides() throws Exception { + String filterInitializerConfKey = "hadoop.http.filter.initializers"; + String[] filterInitializers = + { + AuthenticationFilterInitializer.class.getName(), + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + "," + + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + RMAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + this.getClass().getName() }; + for (String filterInitializer : filterInitializers) { + resourceManager = new ResourceManager(); + Configuration conf = new YarnConfiguration(); + conf.set(filterInitializerConfKey, filterInitializer); + conf.set("hadoop.security.authentication", "kerberos"); + conf.set("hadoop.http.authentication.type", "kerberos"); + try { + try { + UserGroupInformation.setConfiguration(conf); + } catch (Exception e) { + // ignore we just care about getting true for + // isSecurityEnabled() + LOG.info("Got expected exception"); + } + resourceManager.init(conf); + resourceManager.startWepApp(); + } catch (RuntimeException e) { + // Exceptions are expected because we didn't setup everything + // just want to test filter settings + String tmp = resourceManager.getConfig().get(filterInitializerConfKey); + if (filterInitializer.contains(this.getClass().getName())) { + Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName() + + "," + this.getClass().getName(), tmp); + } else { + Assert.assertEquals( + RMAuthenticationFilterInitializer.class.getName(), tmp); + } + resourceManager.stop(); + } + } + + // simple mode overrides + String[] simpleFilterInitializers = + { "", StaticUserWebFilter.class.getName() }; + for (String filterInitializer : simpleFilterInitializers) { + resourceManager = new ResourceManager(); + Configuration conf = new YarnConfiguration(); + conf.set(filterInitializerConfKey, filterInitializer); + try { + UserGroupInformation.setConfiguration(conf); + resourceManager.init(conf); + resourceManager.startWepApp(); + } catch (RuntimeException e) { + // Exceptions are expected because we didn't setup everything + // just want to test filter settings + String tmp = resourceManager.getConfig().get(filterInitializerConfKey); + if (filterInitializer.equals(StaticUserWebFilter.class.getName())) { + Assert.assertEquals(RMAuthenticationFilterInitializer.class.getName() + + "," + StaticUserWebFilter.class.getName(), tmp); + } else { + Assert.assertEquals( + RMAuthenticationFilterInitializer.class.getName(), tmp); + } + resourceManager.stop(); + } + } + } + } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Mon Aug 18 18:41:31 2014 @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -107,7 +108,7 @@ public class TestWorkPreservingRMRestart @Parameterized.Parameters public static Collection<Object[]> getTestParameters() { return Arrays.asList(new Object[][] { { CapacityScheduler.class }, - { FifoScheduler.class } }); + { FifoScheduler.class }, {FairScheduler.class } }); } public TestWorkPreservingRMRestart(Class<?> schedulerClass) { @@ -224,7 +225,11 @@ public class TestWorkPreservingRMRestart assertTrue(schedulerAttempt.getLiveContainers().contains( scheduler.getRMContainer(runningContainer.getContainerId()))); assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); - assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // Until YARN-1959 is resolved + if (scheduler.getClass() != FairScheduler.class) { + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + } // *********** check appSchedulingInfo state *********** assertEquals((1 << 22) + 1, schedulerAttempt.getNewContainerId()); Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Mon Aug 18 18:41:31 2014 @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -100,6 +102,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -1014,4 +1020,874 @@ public class TestCapacityScheduler { // Now with updated ResourceRequest, a container is allocated for AM. Assert.assertTrue(containers.size() == 1); } + + private MockRM setUpMove() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + return rm; + } + + @Test + public void testMoveAppBasic() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "b1"); + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAppSameParent() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2"); + assertTrue(appsInA2.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "a2"); + + // check postconditions + appsInA2 = scheduler.getAppsInQueue("a2"); + assertEquals(1, appsInA2.size()); + queue = + scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a2")); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + + @Test + public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 task_1_0 allocated, used=2G + nodeUpdate(nm_0); + + // nothing allocated + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(1 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G + // available + checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + + // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% + // total cap) + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + // 2GB 1C + Task task_1_1 = + new Task(application_1, priority_0, + new String[] { ResourceRequest.ANY }); + application_1.addTask(task_1_1); + + application_1.schedule(); + + // 2GB 1C + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0, host_1 }); + application_0.addTask(task_0_1); + + application_0.schedule(); + + // prev 2G used free 2G + nodeUpdate(nm_0); + + // prev 0G used free 2G + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + + // Get allocations from the scheduler + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(2 * GB, nm_1); + + } + + @Test + public void testMoveAppSuccess() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // b2 can only run 1 app at a time + scheduler.moveApplication(application_0.getApplicationId(), "b2"); + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(0 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + // lets move application_0 to a queue where it can run + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + application_0.schedule(); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + + } + + @Test(expected = YarnException.class) + public void testMoveAppViolateQueueState() throws Exception { + + resourceManager = new ResourceManager(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + StringBuilder qState = new StringBuilder(); + qState.append(CapacitySchedulerConfiguration.PREFIX).append(B) + .append(CapacitySchedulerConfiguration.DOT) + .append(CapacitySchedulerConfiguration.STATE); + csConf.set(qState.toString(), QueueState.STOPPED.name()); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + mockContext = mock(RMContext.class); + when(mockContext.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(6 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(3 * GB, nm_0); + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + } + + @Test + public void testMoveAppQueueMetricsCheck() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + CapacityScheduler cs = + (CapacityScheduler) resourceManager.getResourceScheduler(); + CSQueue origRootQ = cs.getRootQueue(); + CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ); + int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); + int origNumAppsRoot = origRootQ.getNumApplications(); + + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + + CSQueue newRootQ = cs.getRootQueue(); + int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); + int newNumAppsRoot = newRootQ.getNumApplications(); + CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ); + CapacitySchedulerLeafQueueInfo origOldA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo origNewA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetOldA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetNewA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues()); + // originally submitted here + assertEquals(1, origOldA1.getNumApplications()); + assertEquals(1, origNumAppsA); + assertEquals(2, origNumAppsRoot); + // after the move + assertEquals(0, origNewA1.getNumApplications()); + assertEquals(1, newNumAppsA); + assertEquals(2, newNumAppsRoot); + // original consumption on a1 + assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory()); + assertEquals(1, origOldA1.getResourcesUsed().getvCores()); + assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move + assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move + // app moved here with live containers + assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory()); + assertEquals(1, targetNewA2.getResourcesUsed().getvCores()); + // it was empty before the move + assertEquals(0, targetOldA2.getNumApplications()); + assertEquals(0, targetOldA2.getResourcesUsed().getMemory()); + assertEquals(0, targetOldA2.getResourcesUsed().getvCores()); + // after the app moved here + assertEquals(1, targetNewA2.getNumApplications()); + // 1 container on original queue before move + assertEquals(1, origOldA1.getNumContainers()); + // after the move the resource released + assertEquals(0, origNewA1.getNumContainers()); + // and moved to the new queue + assertEquals(1, targetNewA2.getNumContainers()); + // which originally didn't have any + assertEquals(0, targetOldA2.getNumContainers()); + // 1 user with 3GB + assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + // user ha no more running app in the orig queue + assertEquals(0, origNewA1.getUsers().getUsersList().size()); + // 1 user with 3GB + assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + } + + private int getNumAppsInQueue(String name, List<CSQueue> queues) { + for (CSQueue queue : queues) { + if (queue.getQueueName().equals(name)) { + return queue.getNumApplications(); + } + } + return -1; + } + + private CapacitySchedulerQueueInfo getQueueInfo(String name, + CapacitySchedulerQueueInfoList info) { + if (info != null) { + for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) { + if (queueInfo.getQueueName().equals(name)) { + return queueInfo; + } else { + CapacitySchedulerQueueInfo result = + getQueueInfo(name, queueInfo.getQueues()); + if (result == null) { + continue; + } + return result; + } + } + } + return null; + } + + @Test + public void testMoveAllApps() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + // check postconditions + Thread.sleep(1000); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidDestination() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("a1", "DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("DOES_NOT_EXIST", "b1"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInQueue() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + scheduler.killAllAppsInQueue("a1"); + + // check postconditions + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.isEmpty()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + try { + scheduler.killAllAppsInQueue("DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Mon Aug 18 18:41:31 2014 @@ -28,10 +28,11 @@ import org.apache.hadoop.yarn.util.resou /** * Dummy implementation of Schedulable for unit testing. */ -public class FakeSchedulable extends Schedulable { +public class FakeSchedulable implements Schedulable { private Resource usage; private Resource minShare; private Resource maxShare; + private Resource fairShare; private ResourceWeights weights; private Priority priority; private long startTime; @@ -90,6 +91,21 @@ public class FakeSchedulable extends Sch } @Override + public Resource getFairShare() { + return this.fairShare; + } + + @Override + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } + + @Override + public boolean isActive() { + return true; + } + + @Override public Resource getDemand() { return null; } Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1618700&r1=1618699&r2=1618700&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Mon Aug 18 18:41:31 2014 @@ -62,7 +62,7 @@ public class TestFSLeafQueue { @Test public void testUpdateDemand() { - AppSchedulable app = mock(AppSchedulable.class); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); schedulable.addAppSchedulable(app);
