YARN-7003. DRAINING state of queues is not recovered after RM restart. 
Contributed by Tao Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9db20b3c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9db20b3c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9db20b3c

Branch: refs/heads/HDDS-4
Commit: 9db20b3cdf9ca7344a30245d6f81ea84d4452840
Parents: 082bcd4
Author: Weiwei Yang <w...@apache.org>
Authored: Fri May 11 10:47:04 2018 +0800
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Mon May 14 10:31:09 2018 -0700

----------------------------------------------------------------------
 .../scheduler/capacity/AbstractCSQueue.java     | 15 +++++
 .../scheduler/capacity/CapacityScheduler.java   |  7 +++
 .../scheduler/capacity/TestQueueState.java      | 60 ++++++++++++++++++++
 3 files changed, 82 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db20b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 651d0e9..67b676b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -1244,4 +1244,19 @@ public abstract class AbstractCSQueue implements CSQueue 
{
   public Map<String, Float> getUserWeights() {
     return userWeights;
   }
+
+  public void recoverDrainingState() {
+    try {
+      this.writeLock.lock();
+      if (getState() == QueueState.STOPPED) {
+        updateQueueState(QueueState.DRAINING);
+      }
+      LOG.info("Recover draining state for queue " + this.getQueuePath());
+      if (getParent() != null && getParent().getState() == QueueState.STOPPED) 
{
+        ((AbstractCSQueue) getParent()).recoverDrainingState();
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db20b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1d6c104..162d3bb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -808,6 +809,12 @@ public class CapacityScheduler extends
           throw new QueueInvalidException(queueErrorMsg);
         }
       }
+      // When recovering apps in this queue but queue is in STOPPED state,
+      // that means its previous state was DRAINING. So we auto transit
+      // the state to DRAINING for recovery.
+      if (queue.getState() == QueueState.STOPPED) {
+        ((LeafQueue) queue).recoverDrainingState();
+      }
       // Submit to the queue
       try {
         queue.submitApplication(applicationId, user, queueName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db20b3c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
index 9f2933e..0a39e99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
@@ -32,7 +32,12 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -197,4 +202,59 @@ public class TestQueueState {
         .thenCallRealMethod();
     return application;
   }
+
+  @Test (timeout = 30000)
+  public void testRecoverDrainingStateAfterRMRestart() throws Exception {
+    // init conf
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration();
+    newConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    newConf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+        false);
+    newConf.set(YarnConfiguration.RM_STORE, 
MemoryRMStateStore.class.getName());
+    newConf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
+    newConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{Q1});
+    newConf.setQueues(Q1_PATH, new String[]{Q2});
+    newConf.setCapacity(Q1_PATH, 100);
+    newConf.setCapacity(Q2_PATH, 100);
+
+    // init state store
+    MemoryRMStateStore newMemStore = new MemoryRMStateStore();
+    newMemStore.init(newConf);
+    // init RM & NMs & Nodes
+    MockRM rm = new MockRM(newConf, newMemStore);
+    rm.start();
+    MockNM nm = rm.registerNode("h1:1234", 204800);
+
+    // submit an app, AM is running on nm1
+    RMApp app = rm.submitApp(1024, "appname", "appuser", null, Q2);
+    MockRM.launchAM(app, rm, nm);
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    // update queue state to STOPPED
+    newConf.setState(Q1_PATH, QueueState.STOPPED);
+    CapacityScheduler capacityScheduler =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    capacityScheduler.reinitialize(newConf, rm.getRMContext());
+    // current queue state should be DRAINING
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q1).getState());
+
+    // RM restart
+    rm = new MockRM(newConf, newMemStore);
+    rm.start();
+    rm.registerNode("h1:1234", 204800);
+
+    // queue state should be DRAINING after app recovered
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    capacityScheduler = (CapacityScheduler) rm.getRMContext().getScheduler();
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q2).getState());
+    Assert.assertEquals(QueueState.DRAINING,
+        capacityScheduler.getQueue(Q1).getState());
+
+    // close rm
+    rm.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to