Author: sandy
Date: Fri Nov 29 19:08:48 2013
New Revision: 1546625
URL: http://svn.apache.org/r1546625
Log:
YARN-1241: Include missing files
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1546625&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
Fri Nov 29 19:08:48 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+/**
+ * Handles tracking and enforcement for user and queue maxRunningApps
+ * constraints
+ */
+public class MaxRunningAppsEnforcer {
+ private final QueueManager queueMgr;
+
+ // Tracks the number of running applications by user.
+ private final Map<String, Integer> usersNumRunnableApps;
+ @VisibleForTesting
+ final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
+
+ public MaxRunningAppsEnforcer(QueueManager queueMgr) {
+ this.queueMgr = queueMgr;
+ this.usersNumRunnableApps = new HashMap<String, Integer>();
+ this.usersNonRunnableApps = ArrayListMultimap.create();
+ }
+
+ /**
+ * Checks whether making the application runnable would exceed any
+ * maxRunningApps limits.
+ */
+ public boolean canAppBeRunnable(FSQueue queue, String user) {
+ Integer userNumRunnable = usersNumRunnableApps.get(user);
+ if (userNumRunnable == null) {
+ userNumRunnable = 0;
+ }
+ if (userNumRunnable >= queueMgr.getUserMaxApps(user)) {
+ return false;
+ }
+ // Check queue and all parent queues
+ while (queue != null) {
+ int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName());
+ if (queue.getNumRunnableApps() >= queueMaxApps) {
+ return false;
+ }
+ queue = queue.getParent();
+ }
+
+ return true;
+ }
+
+ /**
+ * Tracks the given new runnable app for purposes of maintaining max running
+ * app limits.
+ */
+ public void trackRunnableApp(FSSchedulerApp app) {
+ String user = app.getUser();
+ FSLeafQueue queue = app.getQueue();
+ // Increment running counts for all parent queues
+ FSParentQueue parent = queue.getParent();
+ while (parent != null) {
+ parent.incrementRunnableApps();
+ parent = parent.getParent();
+ }
+
+ Integer userNumRunnable = usersNumRunnableApps.get(user);
+ usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
+ : userNumRunnable) + 1);
+ }
+
+ /**
+ * Tracks the given new non runnable app so that it can be made runnable when
+ * it would not violate max running app limits.
+ */
+ public void trackNonRunnableApp(FSSchedulerApp app) {
+ String user = app.getUser();
+ usersNonRunnableApps.put(user, app.getAppSchedulable());
+ }
+
+ /**
+ * Updates the relevant tracking variables after a runnable app with the
given
+ * queue and user has been removed. Checks to see whether any other
applications
+ * are now runnable and makes them so.
+ *
+ * Runs in O(n log(n)) where n is the number of queues that are under the
+ * highest queue that went from having no slack to having slack.
+ */
+ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
+ // Update usersRunnableApps
+ String user = app.getUser();
+ int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
+ if (newUserNumRunning == 0) {
+ usersNumRunnableApps.remove(user);
+ } else {
+ usersNumRunnableApps.put(user, newUserNumRunning);
+ }
+
+ // Update runnable app bookkeeping for queues:
+ // childqueueX might have no pending apps itself, but if a queue higher up
+ // in the hierarchy parentqueueY has a maxRunningApps set, an app
completion
+ // in childqueueX could allow an app in some other distant child of
+ // parentqueueY to become runnable.
+ // An app removal will only possibly allow another app to become runnable
if
+ // the queue was already at its max before the removal.
+ // Thus we find the ancestor queue highest in the tree for which the app
+ // that was at its maxRunningApps before the removal.
+ FSLeafQueue queue = app.getQueue();
+ FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
+ queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
+ FSParentQueue parent = queue.getParent();
+ while (parent != null) {
+ if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent
+ .getName())) {
+ highestQueueWithAppsNowRunnable = parent;
+ }
+ parent.decrementRunnableApps();
+ parent = parent.getParent();
+ }
+
+ List<List<AppSchedulable>> appsNowMaybeRunnable =
+ new ArrayList<List<AppSchedulable>>();
+
+ // Compile lists of apps which may now be runnable
+ // We gather lists instead of building a set of all non-runnable apps so
+ // that this whole operation can be O(number of queues) instead of
+ // O(number of apps)
+ if (highestQueueWithAppsNowRunnable != null) {
+ gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
+ appsNowMaybeRunnable);
+ }
+ if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) {
+ List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
+ if (userWaitingApps != null) {
+ appsNowMaybeRunnable.add(userWaitingApps);
+ }
+ }
+
+ // Scan through and check whether this means that any apps are now runnable
+ Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
+ appsNowMaybeRunnable);
+ FSSchedulerApp prev = null;
+ int numNowRunnable = 0;
+ while (iter.hasNext()) {
+ FSSchedulerApp next = iter.next();
+ if (next == prev) {
+ continue;
+ }
+
+ if (canAppBeRunnable(next.getQueue(), next.getUser())) {
+ trackRunnableApp(next);
+ AppSchedulable appSched = next.getAppSchedulable();
+ next.getQueue().makeAppRunnable(appSched);
+ if (!usersNonRunnableApps.remove(next.getUser(), appSched)) {
+ throw new IllegalStateException("Waiting app " + next
+ + " expected to be in usersNonRunnableApps");
+ }
+
+ // No more than one app per list will be able to be made runnable, so
+ // we can stop looking after we've found that many
+ if (numNowRunnable >= appsNowMaybeRunnable.size()) {
+ break;
+ }
+ }
+
+ prev = next;
+ }
+ }
+
+ /**
+ * Stops tracking the given non-runnable app
+ */
+ public void untrackNonRunnableApp(FSSchedulerApp app) {
+ usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable());
+ }
+
+ /**
+ * Traverses the queue hierarchy under the given queue to gather all lists
+ * of non-runnable applications.
+ */
+ private void gatherPossiblyRunnableAppLists(FSQueue queue,
+ List<List<AppSchedulable>> appLists) {
+ if (queue.getNumRunnableApps() <
queueMgr.getQueueMaxApps(queue.getName())) {
+ if (queue instanceof FSLeafQueue) {
+ appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables());
+ } else {
+ for (FSQueue child : queue.getChildQueues()) {
+ gatherPossiblyRunnableAppLists(child, appLists);
+ }
+ }
+ }
+ }
+
+ /**
+ * Takes a list of lists, each of which is ordered by start time, and returns
+ * their elements in order of start time.
+ *
+ * We maintain positions in each of the lists. Each next() call advances
+ * the position in one of the lists. We maintain a heap that orders lists
+ * by the start time of the app in the current position in that list.
+ * This allows us to pick which list to advance in O(log(num lists)) instead
+ * of O(num lists) time.
+ */
+ private static class MultiListStartTimeIterator implements
+ Iterator<FSSchedulerApp> {
+
+ private List<AppSchedulable>[] appLists;
+ private int[] curPositionsInAppLists;
+ private PriorityQueue<IndexAndTime> appListsByCurStartTime;
+
+ @SuppressWarnings("unchecked")
+ public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) {
+ appLists = appListList.toArray(new List[appListList.size()]);
+ curPositionsInAppLists = new int[appLists.length];
+ appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
+ for (int i = 0; i < appLists.length; i++) {
+ long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
+ .getStartTime();
+ appListsByCurStartTime.add(new IndexAndTime(i, time));
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !appListsByCurStartTime.isEmpty()
+ && appListsByCurStartTime.peek().time != Long.MAX_VALUE;
+ }
+
+ @Override
+ public FSSchedulerApp next() {
+ IndexAndTime indexAndTime = appListsByCurStartTime.remove();
+ int nextListIndex = indexAndTime.index;
+ AppSchedulable next = appLists[nextListIndex]
+ .get(curPositionsInAppLists[nextListIndex]);
+ curPositionsInAppLists[nextListIndex]++;
+
+ if (curPositionsInAppLists[nextListIndex] <
appLists[nextListIndex].size()) {
+ indexAndTime.time = appLists[nextListIndex]
+ .get(curPositionsInAppLists[nextListIndex]).getStartTime();
+ } else {
+ indexAndTime.time = Long.MAX_VALUE;
+ }
+ appListsByCurStartTime.add(indexAndTime);
+
+ return next.getApp();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not supported");
+ }
+
+ private static class IndexAndTime implements Comparable<IndexAndTime> {
+ public int index;
+ public long time;
+
+ public IndexAndTime(int index, long time) {
+ this.index = index;
+ this.time = time;
+ }
+
+ @Override
+ public int compareTo(IndexAndTime o) {
+ return time < o.time ? -1 : (time > o.time ? 1 : 0);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof IndexAndTime)) {
+ return false;
+ }
+ IndexAndTime other = (IndexAndTime)o;
+ return other.time == time;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)time;
+ }
+ }
+ }
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java?rev=1546625&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
Fri Nov 29 19:08:48 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMaxRunningAppsEnforcer {
+ private QueueManager queueManager;
+ private Map<String, Integer> queueMaxApps;
+ private Map<String, Integer> userMaxApps;
+ private MaxRunningAppsEnforcer maxAppsEnforcer;
+ private int appNum;
+ private TestFairScheduler.MockClock clock;
+
+ @Before
+ public void setup() throws Exception {
+ clock = new TestFairScheduler.MockClock();
+ FairScheduler scheduler = mock(FairScheduler.class);
+ when(scheduler.getConf()).thenReturn(
+ new FairSchedulerConfiguration(new Configuration()));
+ when(scheduler.getClock()).thenReturn(clock);
+
+ queueManager = new QueueManager(scheduler);
+ queueManager.initialize();
+
+ queueMaxApps = queueManager.info.queueMaxApps;
+ userMaxApps = queueManager.info.userMaxApps;
+ maxAppsEnforcer = new MaxRunningAppsEnforcer(queueManager);
+ appNum = 0;
+ }
+
+ private FSSchedulerApp addApp(FSLeafQueue queue, String user) {
+ ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
+ ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
+ boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user);
+ FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null);
+ queue.addApp(app, runnable);
+ if (runnable) {
+ maxAppsEnforcer.trackRunnableApp(app);
+ } else {
+ maxAppsEnforcer.trackNonRunnableApp(app);
+ }
+ return app;
+ }
+
+ private void removeApp(FSSchedulerApp app) {
+ app.getQueue().removeApp(app);
+ maxAppsEnforcer.updateRunnabilityOnAppRemoval(app);
+ }
+
+ @Test
+ public void testRemoveDoesNotEnableAnyApp() {
+ FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true);
+ FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true);
+ queueMaxApps.put("root", 2);
+ queueMaxApps.put("root.queue1", 1);
+ queueMaxApps.put("root.queue2", 1);
+ FSSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ assertEquals(1, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+ removeApp(app1);
+ assertEquals(0, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+ }
+
+ @Test
+ public void testRemoveEnablesAppOnCousinQueue() {
+ FSLeafQueue leaf1 =
queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
+ FSLeafQueue leaf2 =
queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
+ queueMaxApps.put("root.queue1", 2);
+ FSSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ assertEquals(1, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+ removeApp(app1);
+ assertEquals(0, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(2, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
+ }
+
+ @Test
+ public void testRemoveEnablesOneByQueueOneByUser() {
+ FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.leaf1", true);
+ FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true);
+ queueMaxApps.put("root.queue1.leaf1", 2);
+ userMaxApps.put("user1", 1);
+ FSSchedulerApp app1 = addApp(leaf1, "user1");
+ addApp(leaf1, "user2");
+ addApp(leaf1, "user3");
+ addApp(leaf2, "user1");
+ assertEquals(2, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf1.getNonRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+ removeApp(app1);
+ assertEquals(2, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(0, leaf1.getNonRunnableAppSchedulables().size());
+ assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
+ }
+
+ @Test
+ public void testRemoveEnablingOrderedByStartTime() {
+ FSLeafQueue leaf1 =
queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
+ FSLeafQueue leaf2 =
queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
+ queueMaxApps.put("root.queue1", 2);
+ FSSchedulerApp app1 = addApp(leaf1, "user");
+ addApp(leaf2, "user");
+ addApp(leaf2, "user");
+ clock.tick(20);
+ addApp(leaf1, "user");
+ assertEquals(1, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(1, leaf1.getNonRunnableAppSchedulables().size());
+ assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+ removeApp(app1);
+ assertEquals(0, leaf1.getRunnableAppSchedulables().size());
+ assertEquals(2, leaf2.getRunnableAppSchedulables().size());
+ assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
+ }
+
+}