YARN-7509. AsyncScheduleThread and ResourceCommitterService are still running 
after RM is transitioned to standby. (Tao Yang via wangda)

Change-Id: I7477fe355419fd4a0a6e2bdda7319abad4c4c748


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/834e91ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/834e91ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/834e91ee

Branch: refs/heads/YARN-5881
Commit: 834e91ee91d22d74866afbf6252107e969bf8370
Parents: d162252
Author: Wangda Tan <wan...@apache.org>
Authored: Thu Nov 23 19:59:03 2017 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Thu Nov 23 19:59:03 2017 -0800

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   |  16 +-
 .../TestRMHAForAsyncScheduler.java              | 155 +++++++++++++++++++
 2 files changed, 164 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/834e91ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ed30ad1..218adf3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -459,7 +459,7 @@ public class CapacityScheduler extends
    * Schedule on all nodes by starting at a random point.
    * @param cs
    */
-  static void schedule(CapacityScheduler cs) {
+  static void schedule(CapacityScheduler cs) throws InterruptedException{
     // First randomize the start point
     int current = 0;
     Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
@@ -475,9 +475,7 @@ public class CapacityScheduler extends
       cs.allocateContainersToNode(node.getNodeID(), false);
     }
 
-    try {
-      Thread.sleep(cs.getAsyncScheduleInterval());
-    } catch (InterruptedException e) {}
+    Thread.sleep(cs.getAsyncScheduleInterval());
   }
 
   static class AsyncScheduleThread extends Thread {
@@ -492,9 +490,9 @@ public class CapacityScheduler extends
 
     @Override
     public void run() {
-      while (true) {
+      while (!Thread.currentThread().isInterrupted()) {
         try {
-          if (!runSchedules.get() || Thread.currentThread().isInterrupted()) {
+          if (!runSchedules.get()) {
             Thread.sleep(100);
           } else {
             // Don't run schedule if we have some pending backlogs already
@@ -505,9 +503,11 @@ public class CapacityScheduler extends
             }
           }
         } catch (InterruptedException ie) {
-          // Do nothing
+          // keep interrupt signal
+          Thread.currentThread().interrupt();
         }
       }
+      LOG.info("AsyncScheduleThread[" + getName() + "] exited!");
     }
 
     public void beginSchedule() {
@@ -546,8 +546,10 @@ public class CapacityScheduler extends
 
         } catch (InterruptedException e) {
           LOG.error(e);
+          Thread.currentThread().interrupt();
         }
       }
+      LOG.info("ResourceCommitterService exited!");
     }
 
     public void addNewCommitRequest(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/834e91ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
new file mode 100644
index 0000000..46d5cda
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRMHAForAsyncScheduler extends RMHATestBase {
+
+  @Before
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    confForRM1
+        .setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+            DominantResourceCalculator.class, ResourceCalculator.class);
+    confForRM1.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class,
+        ResourceScheduler.class);
+    confForRM1.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
+
+    confForRM2
+        .setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+            DominantResourceCalculator.class, ResourceCalculator.class);
+    confForRM2.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class,
+        ResourceScheduler.class);
+    confForRM2.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
+  }
+
+  @Test(timeout = 60000)
+  public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
+    // start two RMs, and transit rm1 to active, rm2 to standby
+    startRMs();
+    // register NM
+    rm1.registerNode("h1:1234", 8192, 8);
+    // submit app1 and check
+    RMApp app1 = submitAppAndCheckLaunched(rm1);
+
+    // failover RM1 to RM2
+    explicitFailover();
+    checkAsyncSchedulerThreads(Thread.currentThread());
+
+    // register NM, kill app1
+    rm2.registerNode("h1:1234", 8192, 8);
+    rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.LAUNCHED);
+    rm2.killApp(app1.getApplicationId());
+    // submit app3 and check
+    RMApp app2 = submitAppAndCheckLaunched(rm2);
+
+    // failover RM2 to RM1
+    HAServiceProtocol.StateChangeRequestInfo requestInfo =
+        new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+    rm2.adminService.transitionToStandby(requestInfo);
+    rm1.adminService.transitionToActive(requestInfo);
+    Assert.assertTrue(rm2.getRMContext().getHAServiceState()
+        == HAServiceProtocol.HAServiceState.STANDBY);
+    Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+        == HAServiceProtocol.HAServiceState.ACTIVE);
+    // check async schedule threads
+    checkAsyncSchedulerThreads(Thread.currentThread());
+
+    // register NM, kill app2
+    rm1.registerNode("h1:1234", 8192, 8);
+    rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.LAUNCHED);
+    rm1.killApp(app2.getApplicationId());
+    // submit app3 and check
+    submitAppAndCheckLaunched(rm1);
+
+    rm1.stop();
+    rm2.stop();
+  }
+
+  private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
+    RMApp app = rm.submitApp(200, "",
+        UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
+        "default", configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, false,
+        false);
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.LAUNCHED);
+    return app;
+  }
+
+  /**
+   * Make sure the state of async-scheduler threads is correct
+   * @param currentThread
+   */
+  private void checkAsyncSchedulerThreads(Thread currentThread){
+    // Make sure AsyncScheduleThread is interrupted
+    ThreadGroup threadGroup = currentThread.getThreadGroup();
+    while (threadGroup.getParent() != null) {
+      threadGroup = threadGroup.getParent();
+    }
+    Thread[] threads = new Thread[threadGroup.activeCount()];
+    threadGroup.enumerate(threads);
+    int numAsyncScheduleThread = 0;
+    int numResourceCommitterService = 0;
+    Thread asyncScheduleThread = null;
+    Thread resourceCommitterService = null;
+    for (Thread thread : threads) {
+      StackTraceElement[] stackTrace = thread.getStackTrace();
+      if(stackTrace.length>0){
+        String stackBottom = stackTrace[stackTrace.length-1].toString();
+        if(stackBottom.contains("AsyncScheduleThread.run")){
+          numAsyncScheduleThread++;
+          asyncScheduleThread = thread;
+        }else if(stackBottom.contains("ResourceCommitterService.run")){
+          numResourceCommitterService++;
+          resourceCommitterService = thread;
+        }
+      }
+    }
+    Assert.assertEquals(1, numResourceCommitterService);
+    Assert.assertEquals(1, numAsyncScheduleThread);
+    Assert.assertNotNull(asyncScheduleThread);
+    Assert.assertNotNull(resourceCommitterService);
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to