Author: kasha
Date: Tue Jul 29 17:46:58 2014
New Revision: 1614435
URL: http://svn.apache.org/r1614435
Log:
YARN-2328. FairScheduler: Verify update and continuous scheduling threads are
stopped when the scheduler is stopped. (kasha)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1614435&r1=1614434&r2=1614435&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Jul 29
17:46:58 2014
@@ -53,6 +53,9 @@ Release 2.6.0 - UNRELEASED
YARN-2211. Persist AMRMToken master key in RMStateStore for RM recovery.
(Xuan Gong via jianhe)
+ YARN-2328. FairScheduler: Verify update and continuous scheduling threads
are
+ stopped when the scheduler is stopped. (kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1614435&r1=1614434&r2=1614435&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Tue Jul 29 17:46:58 2014
@@ -139,8 +139,11 @@ public class FairScheduler extends
private final int UPDATE_DEBUG_FREQUENCY = 5;
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
- private Thread updateThread;
- private Thread schedulingThread;
+ @VisibleForTesting
+ Thread updateThread;
+
+ @VisibleForTesting
+ Thread schedulingThread;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@@ -243,16 +246,21 @@ public class FairScheduler extends
}
/**
- * A runnable which calls {@link FairScheduler#update()} every
+ * Thread which calls {@link FairScheduler#update()} every
* <code>updateInterval</code> milliseconds.
*/
- private class UpdateThread implements Runnable {
+ private class UpdateThread extends Thread {
+
+ @Override
public void run() {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(updateInterval);
update();
preemptTasksIfNecessary();
+ } catch (InterruptedException ie) {
+ LOG.warn("Update thread interrupted. Exiting.");
+ return;
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e);
}
@@ -261,6 +269,26 @@ public class FairScheduler extends
}
/**
+ * Thread which attempts scheduling resources continuously,
+ * asynchronous to the node heartbeats.
+ */
+ private class ContinuousSchedulingThread extends Thread {
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ continuousSchedulingAttempt();
+ Thread.sleep(getContinuousSchedulingSleepMs());
+ } catch (InterruptedException e) {
+ LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
+ return;
+ }
+ }
+ }
+ }
+
+ /**
* Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
@@ -970,7 +998,7 @@ public class FairScheduler extends
}
}
- void continuousSchedulingAttempt() {
+ void continuousSchedulingAttempt() throws InterruptedException {
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
@@ -1229,30 +1257,14 @@ public class FairScheduler extends
throw new IOException("Failed to start FairScheduler", e);
}
- updateThread = new Thread(new UpdateThread());
+ updateThread = new UpdateThread();
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
if (continuousSchedulingEnabled) {
// start continuous scheduling thread
- schedulingThread = new Thread(
- new Runnable() {
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- continuousSchedulingAttempt();
- Thread.sleep(getContinuousSchedulingSleepMs());
- } catch (InterruptedException e) {
- LOG.error("Continuous scheduling thread interrupted.
Exiting. ",
- e);
- return;
- }
- }
- }
- }
- );
- schedulingThread.setName("ContinuousScheduling");
+ schedulingThread = new ContinuousSchedulingThread();
+ schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1614435&r1=1614434&r2=1614435&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Tue Jul 29 17:46:58 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -3341,4 +3342,28 @@ public class TestFairScheduler extends F
scheduler.findLowestCommonAncestorQueue(a1Queue, b1Queue);
assertEquals(ancestorQueue, queue1);
}
+
+ @Test
+ public void testThreadLifeCycle() throws InterruptedException {
+ conf.setBoolean(
+ FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
+ scheduler.init(conf);
+ scheduler.start();
+
+ Thread updateThread = scheduler.updateThread;
+ Thread schedulingThread = scheduler.schedulingThread;
+
+ assertTrue(updateThread.isAlive());
+ assertTrue(schedulingThread.isAlive());
+
+ scheduler.stop();
+
+ int numRetries = 100;
+ while (numRetries-- > 0 &&
+ (updateThread.isAlive() || schedulingThread.isAlive())) {
+ Thread.sleep(50);
+ }
+
+ assertNotEquals("One of the threads is still alive", 0, numRetries);
+ }
}