YARN-5333. Some recovered apps are put into default queue when RM HA. 
Contributed by Jun Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9a354c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9a354c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9a354c2

Branch: refs/heads/YARN-2915
Commit: d9a354c2f39274b2810144d1ae133201e44e3bfc
Parents: 4a26221
Author: Rohith Sharma K S <rohithsharm...@apache.org>
Authored: Fri Aug 5 21:35:49 2016 +0530
Committer: Rohith Sharma K S <rohithsharm...@apache.org>
Committed: Fri Aug 5 21:35:49 2016 +0530

----------------------------------------------------------------------
 .../server/resourcemanager/AdminService.java    | 118 ++++++++++++-------
 .../scheduler/fair/TestFairScheduler.java       |  69 +++++++++++
 2 files changed, 142 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9a354c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 4d4ad5d..2ec03aa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.DecommissionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.conf.HAUtil;
@@ -319,15 +318,7 @@ public class AdminService extends CompositeService 
implements
 
     UserGroupInformation user = checkAccess("transitionToActive");
     checkHaStateChange(reqInfo);
-    try {
-      rm.transitionToActive();
-    } catch (Exception e) {
-      RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
-          "", "RM",
-          "Exception transitioning to active");
-      throw new ServiceFailedException(
-          "Error when transitioning to Active mode", e);
-    }
+
     try {
       // call all refresh*s for active RM to get the updated configurations.
       refreshAll();
@@ -337,10 +328,22 @@ public class AdminService extends CompositeService 
implements
           .getDispatcher()
           .getEventHandler()
           .handle(
-          new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e));
+              new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED,
+                  e));
       throw new ServiceFailedException(
-          "Error on refreshAll during transistion to Active", e);
+          "Error on refreshAll during transition to Active", e);
     }
+
+    try {
+      rm.transitionToActive();
+    } catch (Exception e) {
+      RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
+          "", "RM",
+          "Exception transitioning to active");
+      throw new ServiceFailedException(
+          "Error when transitioning to Active mode", e);
+    }
+
     RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
         "RM");
   }
@@ -408,12 +411,7 @@ public class AdminService extends CompositeService 
implements
     RefreshQueuesResponse response =
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
     try {
-      rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
-      // refresh the reservation system
-      ReservationSystem rSystem = rmContext.getReservationSystem();
-      if (rSystem != null) {
-        rSystem.reinitialize(getConfig(), rmContext);
-      }
+      refreshQueues();
       RMAuditLogger.logSuccess(user.getShortUserName(), operation,
           "AdminService");
       return response;
@@ -422,6 +420,15 @@ public class AdminService extends CompositeService 
implements
     }
   }
 
+  private void refreshQueues() throws IOException, YarnException {
+    rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+    // refresh the reservation system
+    ReservationSystem rSystem = rmContext.getReservationSystem();
+    if (rSystem != null) {
+      rSystem.reinitialize(getConfig(), rmContext);
+    }
+  }
+
   @Override
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws YarnException, StandbyException {
@@ -454,6 +461,13 @@ public class AdminService extends CompositeService 
implements
     }
   }
 
+  private void refreshNodes() throws IOException, YarnException {
+    Configuration conf =
+        getConfiguration(new Configuration(false),
+            YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    rmContext.getNodesListManager().refreshNodes(conf);
+  }
+
   @Override
   public RefreshSuperUserGroupsConfigurationResponse 
refreshSuperUserGroupsConfiguration(
       RefreshSuperUserGroupsConfigurationRequest request)
@@ -464,6 +478,16 @@ public class AdminService extends CompositeService 
implements
     checkRMStatus(user.getShortUserName(), operation,
             "refresh super-user-groups.");
 
+    refreshSuperUserGroupsConfiguration();
+    RMAuditLogger.logSuccess(user.getShortUserName(),
+        operation, "AdminService");
+
+    return recordFactory.newRecordInstance(
+        RefreshSuperUserGroupsConfigurationResponse.class);
+  }
+
+  private void refreshSuperUserGroupsConfiguration()
+      throws IOException, YarnException {
     // Accept hadoop common configs in core-site.xml as well as RM specific
     // configurations in yarn-site.xml
     Configuration conf =
@@ -472,11 +496,6 @@ public class AdminService extends CompositeService 
implements
             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
     RMServerUtils.processRMProxyUsersConf(conf);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
-    RMAuditLogger.logSuccess(user.getShortUserName(),
-        operation, "AdminService");
-
-    return recordFactory.newRecordInstance(
-        RefreshSuperUserGroupsConfigurationResponse.class);
   }
 
   @Override
@@ -488,10 +507,7 @@ public class AdminService extends CompositeService 
implements
 
     checkRMStatus(user.getShortUserName(), operation, "refresh user-groups.");
 
-    Groups.getUserToGroupsMappingService(
-        getConfiguration(new Configuration(false),
-            YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
-
+    refreshUserToGroupsMappings();
     RMAuditLogger.logSuccess(user.getShortUserName(), operation,
             "AdminService");
 
@@ -499,6 +515,12 @@ public class AdminService extends CompositeService 
implements
         RefreshUserToGroupsMappingsResponse.class);
   }
 
+  private void refreshUserToGroupsMappings() throws IOException, YarnException 
{
+    Groups.getUserToGroupsMappingService(
+        getConfiguration(new Configuration(false),
+            YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
+  }
+
   @Override
   public RefreshAdminAclsResponse refreshAdminAcls(
       RefreshAdminAclsRequest request) throws YarnException, IOException {
@@ -541,6 +563,14 @@ public class AdminService extends CompositeService 
implements
 
     checkRMStatus(user.getShortUserName(), operation, "refresh Service ACLs.");
 
+    refreshServiceAcls();
+    RMAuditLogger.logSuccess(user.getShortUserName(), operation,
+            "AdminService");
+
+    return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
+  }
+
+  private void refreshServiceAcls() throws IOException, YarnException {
     PolicyProvider policyProvider = RMPolicyProvider.getInstance();
     Configuration conf =
         getConfiguration(new Configuration(false),
@@ -552,11 +582,6 @@ public class AdminService extends CompositeService 
implements
         conf, policyProvider);
     rmContext.getResourceTrackerService().refreshServiceAcls(
         conf, policyProvider);
-
-    RMAuditLogger.logSuccess(user.getShortUserName(), operation,
-            "AdminService");
-
-    return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
   }
 
   private synchronized void refreshServiceAcls(Configuration configuration,
@@ -691,18 +716,17 @@ public class AdminService extends CompositeService 
implements
   @VisibleForTesting
   void refreshAll() throws ServiceFailedException {
     try {
-      refreshQueues(RefreshQueuesRequest.newInstance());
-      refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL));
-      refreshSuperUserGroupsConfiguration(
-          RefreshSuperUserGroupsConfigurationRequest.newInstance());
-      refreshUserToGroupsMappings(
-          RefreshUserToGroupsMappingsRequest.newInstance());
+      checkAcls("refreshAll");
+      refreshQueues();
+      refreshNodes();
+      refreshSuperUserGroupsConfiguration();
+      refreshUserToGroupsMappings();
       if (getConfig().getBoolean(
           CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
           false)) {
-        refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
+        refreshServiceAcls();
       }
-      
refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance());
+      refreshClusterMaxPriority();
     } catch (Exception ex) {
       throw new ServiceFailedException(ex.getMessage());
     }
@@ -839,11 +863,7 @@ public class AdminService extends CompositeService 
implements
 
     checkRMStatus(user.getShortUserName(), operation, msg);
     try {
-      Configuration conf =
-          getConfiguration(new Configuration(false),
-              YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-
-      rmContext.getScheduler().setClusterMaxPriority(conf);
+      refreshClusterMaxPriority();
 
       RMAuditLogger
           .logSuccess(user.getShortUserName(), operation, "AdminService");
@@ -854,6 +874,14 @@ public class AdminService extends CompositeService 
implements
     }
   }
 
+  private void refreshClusterMaxPriority() throws IOException, YarnException {
+    Configuration conf =
+        getConfiguration(new Configuration(false),
+            YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+
+    rmContext.getScheduler().setClusterMaxPriority(conf);
+  }
+
   public String getHAZookeeperConnectionState() {
     if (!rmContext.isHAEnabled()) {
       return "ResourceManager HA is not enabled.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9a354c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index dab7312..87670c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -49,6 +49,7 @@ import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetworkTopology;
@@ -74,9 +75,13 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -84,6 +89,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -4648,4 +4654,67 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     long reservedId = reservedContainer1.getContainerId().getContainerId();
     assertEquals(reservedId + 1, maxId);
   }
+
+  @Test(timeout = 120000)
+  public void testRefreshQueuesWhenRMHA() throws Exception {
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
+    conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    HAServiceProtocol.StateChangeRequestInfo requestInfo =
+        new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues
+    MockRM rm1 = new MockRM(conf, null);
+    rm1.init(conf);
+    rm1.start();
+    rm1.getAdminService().transitionToStandby(requestInfo);
+
+    // 2. add a new queue "test_queue"
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"test_queue\">");
+    out.println("  <maxRunningApps>3</maxRunningApps>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // 3. start a active RM
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.init(conf);
+    rm2.start();
+
+    MockNM nm =
+        new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+    nm.registerNode();
+
+    rm2.getAdminService().transitionToActive(requestInfo);
+
+    // 4. submit a app to the new added queue "test_queue"
+    RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue");
+    RMAppAttempt attempt0 = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId());
+    am0.registerAppAttempt();
+    assertEquals("root.test_queue", app.getQueue());
+
+    // 5. Transit rm1 to active, recover app
+    ((RMContextImpl)rm1.getRMContext()).setStateStore(memStore);
+    rm1.getAdminService().transitionToActive(requestInfo);
+    rm1.drainEvents();
+    assertEquals(1, rm1.getRMContext().getRMApps().size());
+    RMApp recoveredApp =
+        rm1.getRMContext().getRMApps().values().iterator().next();
+    assertEquals("root.test_queue", recoveredApp.getQueue());
+
+    rm1.stop();
+    rm2.stop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to