Author: sandy
Date: Fri May 23 22:52:46 2014
New Revision: 1597209
URL: http://svn.apache.org/r1597209
Log:
YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting
resources when cluster is free (Karthik Kambatla via Sandy Ryza)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1597209&r1=1597208&r2=1597209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri May 23 22:52:46 2014
@@ -104,6 +104,9 @@ Release 2.5.0 - UNRELEASED
YARN-2059. Added admin ACLs support to Timeline Server. (Zhijie Shen via
vinodkv)
+
+ YARN-2073. Fair Scheduler: Add a utilization threshold to prevent
preempting
+ resources when cluster is free (Karthik Kambatla via Sandy Ryza)
OPTIMIZATIONS
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1597209&r1=1597208&r2=1597209&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Fri May 23 22:52:46 2014
@@ -148,7 +148,11 @@ public class FairScheduler extends
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
- // How often tasks are preempted
+ // Preemption related variables
+ protected boolean preemptionEnabled;
+ protected float preemptionUtilizationThreshold;
+
+ // How often tasks are preempted
protected long preemptionInterval;
// ms to wait before force killing stuff (must be longer than a couple
@@ -158,7 +162,6 @@ public class FairScheduler extends
// Containers whose AMs have been warned that they will be preempted soon.
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
- protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight
adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling
enabled or not
@@ -318,7 +321,7 @@ public class FairScheduler extends
* and then select the right ones using preemptTasks.
*/
protected synchronized void preemptTasksIfNecessary() {
- if (!preemptionEnabled) {
+ if (!shouldAttemptPreemption()) {
return;
}
@@ -328,10 +331,9 @@ public class FairScheduler extends
}
lastPreemptCheckTime = curTime;
- Resource resToPreempt = Resources.none();
-
+ Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
- resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
+ Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt,
Resources.none())) {
@@ -1067,6 +1069,22 @@ public class FairScheduler extends
clusterResource, rootMetrics.getAllocatedResources()));
}
+ /**
+ * Check if preemption is enabled and the utilization threshold for
+ * preemption is met.
+ *
+ * @return true if preemption should be attempted, false otherwise.
+ */
+ private boolean shouldAttemptPreemption() {
+ if (preemptionEnabled) {
+ return (preemptionUtilizationThreshold < Math.max(
+ (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
+ (float) rootMetrics.getAvailableVirtualCores() /
+ clusterResource.getVirtualCores()));
+ }
+ return false;
+ }
+
@Override
public QueueMetrics getRootQueueMetrics() {
return rootMetrics;
@@ -1172,6 +1190,8 @@ public class FairScheduler extends
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
preemptionEnabled = this.conf.getPreemptionEnabled();
+ preemptionUtilizationThreshold =
+ this.conf.getPreemptionUtilizationThreshold();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1597209&r1=1597208&r2=1597209&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
Fri May 23 22:52:46 2014
@@ -101,6 +101,10 @@ public class FairSchedulerConfiguration
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
+
+ protected static final String PREEMPTION_THRESHOLD =
+ CONF_PREFIX + "preemption.cluster-utilization-threshold";
+ protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX +
"preemptionInterval";
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
@@ -185,6 +189,10 @@ public class FairSchedulerConfiguration
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
}
+ public float getPreemptionUtilizationThreshold() {
+ return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
+ }
+
public boolean getAssignMultiple() {
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1597209&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
Fri May 23 22:52:46 2014
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FairSchedulerTestBase {
+ protected static class MockClock implements Clock {
+ private long time = 0;
+ @Override
+ public long getTime() {
+ return time;
+ }
+
+ public void tick(int seconds) {
+ time = time + seconds * 1000;
+ }
+ }
+
+ protected final static String TEST_DIR =
+ new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath();
+
+ private static RecordFactory
+ recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ protected int APP_ID = 1; // Incrementing counter for scheduling apps
+ protected int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
+
+ protected Configuration conf;
+ protected FairScheduler scheduler;
+ protected ResourceManager resourceManager;
+
+ // Helper methods
+ protected Configuration createConfiguration() {
+ Configuration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+ ResourceScheduler.class);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ 1024);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
+ conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+ return conf;
+ }
+
+ protected ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+ ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
+ return ApplicationAttemptId.newInstance(appIdImpl, attemptId);
+ }
+
+ protected ResourceRequest createResourceRequest(
+ int memory, String host, int priority, int numContainers,
+ boolean relaxLocality) {
+ return createResourceRequest(memory, 1, host, priority, numContainers,
+ relaxLocality);
+ }
+
+ protected ResourceRequest createResourceRequest(
+ int memory, int vcores, String host, int priority, int numContainers,
+ boolean relaxLocality) {
+ ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class);
+ request.setCapability(BuilderUtils.newResource(memory, vcores));
+ request.setResourceName(host);
+ request.setNumContainers(numContainers);
+ Priority prio = recordFactory.newRecordInstance(Priority.class);
+ prio.setPriority(priority);
+ request.setPriority(prio);
+ request.setRelaxLocality(relaxLocality);
+ return request;
+ }
+
+ /**
+ * Creates a single container priority-1 request and submits to
+ * scheduler.
+ */
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, String queueId, String userId) {
+ return createSchedulingRequest(memory, queueId, userId, 1);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, int vcores, String queueId, String userId) {
+ return createSchedulingRequest(memory, vcores, queueId, userId, 1);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, String queueId, String userId, int numContainers) {
+ return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, int vcores, String queueId, String userId, int
numContainers) {
+ return createSchedulingRequest(memory, vcores, queueId, userId,
numContainers, 1);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, String queueId, String userId, int numContainers, int
priority) {
+ return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
+ priority);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, int vcores, String queueId, String userId, int numContainers,
+ int priority) {
+ ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
+ scheduler.addApplication(id.getApplicationId(), queueId, userId);
+ // This conditional is for testAclSubmitApplication where app is rejected
+ // and no app is added.
+ if
(scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+ scheduler.addApplicationAttempt(id, false);
+ }
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ResourceRequest request = createResourceRequest(memory, vcores,
ResourceRequest.ANY,
+ priority, numContainers, true);
+ ask.add(request);
+ scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
+ return id;
+ }
+
+ protected void createSchedulingRequestExistingApplication(
+ int memory, int priority, ApplicationAttemptId attId) {
+ ResourceRequest request = createResourceRequest(memory,
ResourceRequest.ANY,
+ priority, 1, true);
+ createSchedulingRequestExistingApplication(request, attId);
+ }
+
+ protected void createSchedulingRequestExistingApplication(
+ int memory, int vcores, int priority, ApplicationAttemptId attId) {
+ ResourceRequest request = createResourceRequest(memory, vcores,
ResourceRequest.ANY,
+ priority, 1, true);
+ createSchedulingRequestExistingApplication(request, attId);
+ }
+
+ protected void createSchedulingRequestExistingApplication(
+ ResourceRequest request, ApplicationAttemptId attId) {
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ask.add(request);
+ scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
+ }
+}
\ No newline at end of file
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1597209&r1=1597208&r2=1597209&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Fri May 23 22:52:46 2014
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@@ -63,8 +62,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -94,7 +91,6 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
@@ -105,46 +101,14 @@ import org.xml.sax.SAXException;
import com.google.common.collect.Sets;
@SuppressWarnings("unchecked")
-public class TestFairScheduler {
+public class TestFairScheduler extends FairSchedulerTestBase {
+ private final static String ALLOC_FILE =
+ new File(TEST_DIR, "test-queues").getAbsolutePath();
- static class MockClock implements Clock {
- private long time = 0;
- @Override
- public long getTime() {
- return time;
- }
-
- public void tick(int seconds) {
- time = time + seconds * 1000;
- }
-
- }
-
- final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "/tmp")).getAbsolutePath();
-
- final static String ALLOC_FILE = new File(TEST_DIR,
- "test-queues").getAbsolutePath();
-
- private FairScheduler scheduler;
- private ResourceManager resourceManager;
- private Configuration conf;
- private static RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
-
- private int APP_ID = 1; // Incrementing counter for schedling apps
- private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
-
- // HELPER METHODS
@Before
public void setUp() throws IOException {
scheduler = new FairScheduler();
conf = createConfiguration();
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
-
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
- 1024);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
- // All tests assume only one assignment per node update
- conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new ResourceManager();
resourceManager.init(conf);
@@ -198,107 +162,6 @@ public class TestFairScheduler {
}
}
- private Configuration createConfiguration() {
- Configuration conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
- ResourceScheduler.class);
- return conf;
- }
-
- private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
- ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
- ApplicationAttemptId attId =
- ApplicationAttemptId.newInstance(appIdImpl, attemptId);
- return attId;
- }
-
- private ResourceRequest createResourceRequest(int memory, String host,
- int priority, int numContainers, boolean relaxLocality) {
- return createResourceRequest(memory, 1, host, priority, numContainers,
- relaxLocality);
- }
-
- private ResourceRequest createResourceRequest(int memory, int vcores, String
host,
- int priority, int numContainers, boolean relaxLocality) {
- ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class);
- request.setCapability(BuilderUtils.newResource(memory, vcores));
- request.setResourceName(host);
- request.setNumContainers(numContainers);
- Priority prio = recordFactory.newRecordInstance(Priority.class);
- prio.setPriority(priority);
- request.setPriority(prio);
- request.setRelaxLocality(relaxLocality);
- return request;
- }
-
- /**
- * Creates a single container priority-1 request and submits to
- * scheduler.
- */
- private ApplicationAttemptId createSchedulingRequest(int memory, String
queueId,
- String userId) {
- return createSchedulingRequest(memory, queueId, userId, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
- String queueId, String userId) {
- return createSchedulingRequest(memory, vcores, queueId, userId, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, String
queueId,
- String userId, int numContainers) {
- return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
- String queueId, String userId, int numContainers) {
- return createSchedulingRequest(memory, vcores, queueId, userId,
numContainers, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, String
queueId,
- String userId, int numContainers, int priority) {
- return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
- priority);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
- String queueId, String userId, int numContainers, int priority) {
- ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
- scheduler.addApplication(id.getApplicationId(), queueId, userId);
- // This conditional is for testAclSubmitApplication where app is rejected
- // and no app is added.
- if
(scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
- scheduler.addApplicationAttempt(id, false);
- }
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest request = createResourceRequest(memory, vcores,
ResourceRequest.ANY,
- priority, numContainers, true);
- ask.add(request);
- scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
- return id;
- }
-
- private void createSchedulingRequestExistingApplication(int memory, int
priority,
- ApplicationAttemptId attId) {
- ResourceRequest request = createResourceRequest(memory,
ResourceRequest.ANY,
- priority, 1, true);
- createSchedulingRequestExistingApplication(request, attId);
- }
-
- private void createSchedulingRequestExistingApplication(int memory, int
vcores,
- int priority, ApplicationAttemptId attId) {
- ResourceRequest request = createResourceRequest(memory, vcores,
ResourceRequest.ANY,
- priority, 1, true);
- createSchedulingRequestExistingApplication(request, attId);
- }
-
- private void createSchedulingRequestExistingApplication(ResourceRequest
request,
- ApplicationAttemptId attId) {
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ask.add(request);
- scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
- }
-
// TESTS
@Test(timeout=2000)
@@ -1455,7 +1318,7 @@ public class TestFairScheduler {
assertEquals(
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
-
+
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException
{
scheduler.reinitialize(conf, resourceManager.getRMContext());
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java?rev=1597209&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
Fri May 23 22:52:46 2014
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
+ private final static String ALLOC_FILE = new File(TEST_DIR,
+ TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
+
+ private MockClock clock;
+
+ private static class StubbedFairScheduler extends FairScheduler {
+ public int lastPreemptMemory = -1;
+
+ @Override
+ protected void preemptResources(
+ Collection<FSLeafQueue> scheds, Resource toPreempt) {
+ lastPreemptMemory = toPreempt.getMemory();
+ }
+
+ public void resetLastPreemptResources() {
+ lastPreemptMemory = -1;
+ }
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ return conf;
+ }
+
+ @Before
+ public void setup() throws IOException {
+ conf = createConfiguration();
+ clock = new MockClock();
+ }
+
+ @After
+ public void teardown() {
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
+ conf = null;
+ }
+
+ private void startResourceManager(float utilizationThreshold) {
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
+ utilizationThreshold);
+ resourceManager = new MockRM(conf);
+ resourceManager.start();
+
+ assertTrue(
+ resourceManager.getResourceScheduler() instanceof
StubbedFairScheduler);
+ scheduler = (FairScheduler)resourceManager.getResourceScheduler();
+
+ scheduler.setClock(clock);
+ scheduler.UPDATE_INTERVAL = 60 * 1000;
+ }
+
+ private void registerNodeAndSubmitApp(
+ int memory, int vcores, int appContainers, int appMemory) {
+ RMNode node1 = MockNodes.newNodeInfo(
+ 1, Resources.createResource(memory, vcores), 1, "node1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ assertEquals("Incorrect amount of resources in the cluster",
+ memory, scheduler.rootMetrics.getAvailableMB());
+ assertEquals("Incorrect amount of resources in the cluster",
+ vcores, scheduler.rootMetrics.getAvailableVirtualCores());
+
+ createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
+ scheduler.update();
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new
NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+ }
+ assertEquals("app1's request is not met",
+ memory - appContainers * appMemory,
+ scheduler.rootMetrics.getAvailableMB());
+ }
+
+ @Test
+ public void testPreemptionWithFreeResources() throws Exception {
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"default\">");
+ out.println("<maxResources>0mb,0vcores</maxResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>1</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>1</weight>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+ out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+
+ startResourceManager(0f);
+ // Create node with 4GB memory and 4 vcores
+ registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
+
+ // Verify submitting another request doesn't trigger preemption
+ createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+ scheduler.update();
+ clock.tick(6);
+
+ ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+ scheduler.preemptTasksIfNecessary();
+ assertEquals("preemptResources() should have been called", 1024,
+ ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+
+ resourceManager.stop();
+
+ startResourceManager(0.8f);
+ // Create node with 4GB memory and 4 vcores
+ registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
+
+ // Verify submitting another request doesn't trigger preemption
+ createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+ scheduler.update();
+ clock.tick(6);
+
+ ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+ scheduler.preemptTasksIfNecessary();
+ assertEquals("preemptResources() should not have been called", -1,
+ ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+ }
+}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1597209&r1=1597208&r2=1597209&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Fri May 23 22:52:46 2014
@@ -156,6 +156,12 @@ Properties that can be placed in yarn-si
* Whether to use preemption. Note that preemption is experimental in the
current
version. Defaults to false.
+ * <<<yarn.scheduler.fair.preemption.cluster-utilization-threshold>>>
+
+ * The utilization threshold after which preemption kicks in. The
+ utilization is computed as the maximum ratio of usage to capacity among
+ all resources. Defaults to 0.8f.
+
* <<<yarn.scheduler.fair.sizebasedweight>>>
* Whether to assign shares to individual apps based on their size, rather
than