YARN-5434. Add -client|server argument for graceful decommmission. Contributed 
by Robert Kanter.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95f2b985
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95f2b985
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95f2b985

Branch: refs/heads/HADOOP-12756
Commit: 95f2b9859718eca12fb3167775cdd2dad25dde25
Parents: 204a205
Author: Junping Du <junping...@apache.org>
Authored: Fri Jul 29 10:26:11 2016 -0700
Committer: Junping Du <junping...@apache.org>
Committed: Fri Jul 29 10:26:11 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/client/cli/RMAdminCLI.java      | 46 ++++++++--
 .../hadoop/yarn/client/cli/TestRMAdminCLI.java  | 92 +++++++++++++++++---
 2 files changed, 115 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/95f2b985/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index aa7fc30..4aa3a14 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -98,11 +98,17 @@ public class RMAdminCLI extends HAAdmin {
               "Reload the queues' acls, states and scheduler specific " +
                   "properties. \n\t\tResourceManager will reload the " +
                   "mapred-queues configuration file."))
-          .put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]",
+          .put("-refreshNodes",
+              new UsageInfo("[-g [timeout in seconds] -client|server]",
               "Refresh the hosts information at the ResourceManager. Here "
-              + "[-g [timeout in seconds] is optional, if we specify the "
-              + "timeout then ResourceManager will wait for timeout before "
-              + "marking the NodeManager as decommissioned."))
+              + "[-g [timeout in seconds] -client|server] is optional, if we "
+              + "specify the timeout then ResourceManager will wait for "
+              + "timeout before marking the NodeManager as decommissioned."
+              + " The -client|server indicates if the timeout tracking should"
+              + " be handled by the client or the ResourceManager. The client"
+              + "-side tracking is blocking, while the server-side tracking"
+              + " is not. Omitting the timeout, or a timeout of -1, indicates"
+              + " an infinite timeout."))
           .put("-refreshNodesResources", new UsageInfo("",
               "Refresh resources of NodeManagers at the ResourceManager."))
           .put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
@@ -230,7 +236,7 @@ public class RMAdminCLI extends HAAdmin {
     summary.append("The full syntax is: \n\n" +
     "yarn rmadmin" +
       " [-refreshQueues]" +
-      " [-refreshNodes [-g [timeout in seconds]]]" +
+      " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
       " [-refreshNodesResources]" +
       " [-refreshSuperUserGroupsConfiguration]" +
       " [-refreshUserToGroupsMappings]" +
@@ -312,7 +318,12 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
 
-  private int refreshNodes(long timeout) throws IOException, YarnException {
+  private int refreshNodes(long timeout, String trackingMode)
+      throws IOException, YarnException {
+    if (!"client".equals(trackingMode)) {
+      throw new UnsupportedOperationException(
+          "Only client tracking mode is currently supported.");
+    }
     // Graceful decommissioning with timeout
     ResourceManagerAdministrationProtocol adminProtocol = 
createAdminProtocol();
     RefreshNodesRequest gracefulRequest = RefreshNodesRequest
@@ -721,11 +732,18 @@ public class RMAdminCLI extends HAAdmin {
       } else if ("-refreshNodes".equals(cmd)) {
         if (args.length == 1) {
           exitCode = refreshNodes();
-        } else if (args.length == 3) {
+        } else if (args.length == 3 || args.length == 4) {
           // if the graceful timeout specified
           if ("-g".equals(args[1])) {
-            long timeout = validateTimeout(args[2]);
-            exitCode = refreshNodes(timeout);
+            long timeout = -1;
+            String trackingMode;
+            if (args.length == 4) {
+              timeout = validateTimeout(args[2]);
+              trackingMode = validateTrackingMode(args[3]);
+            } else {
+              trackingMode = validateTrackingMode(args[2]);
+            }
+            exitCode = refreshNodes(timeout, trackingMode);
           } else {
             printUsage(cmd, isHAEnabled);
             return -1;
@@ -838,6 +856,16 @@ public class RMAdminCLI extends HAAdmin {
     return timeout;
   }
 
+  private String validateTrackingMode(String mode) {
+    if ("-client".equals(mode)) {
+      return "client";
+    }
+    if ("-server".equals(mode)) {
+      return "server";
+    }
+    throw new IllegalArgumentException("Invalid mode specified: " + mode);
+  }
+
   @Override
   public void setConf(Configuration conf) {
     if (conf != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95f2b985/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index 1551333..d3161ba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -255,9 +256,9 @@ public class TestRMAdminCLI {
   }
 
   @Test
-  public void testRefreshNodesWithGracefulTimeout() throws Exception {
+  public void testRefreshNodesGracefulBeforeTimeout() throws Exception {
     // graceful decommission before timeout
-    String[] args = { "-refreshNodes", "-g", "1" };
+    String[] args = {"-refreshNodes", "-g", "1", "-client"};
     CheckForDecommissioningNodesResponse response = Records
         .newRecord(CheckForDecommissioningNodesResponse.class);
     HashSet<NodeId> decomNodes = new HashSet<NodeId>();
@@ -267,30 +268,91 @@ public class TestRMAdminCLI {
     assertEquals(0, rmAdminCLI.run(args));
     verify(admin).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+    verify(admin, never()).refreshNodes(
+        RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+  }
 
+  @Test
+  public void testRefreshNodesGracefulHitTimeout() throws Exception {
     // Forceful decommission when timeout occurs
-    String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" };
-    decomNodes = new HashSet<NodeId>();
+    String[] forcefulDecomArgs = {"-refreshNodes", "-g", "1", "-client"};
+    HashSet<NodeId> decomNodes = new HashSet<NodeId>();
+    CheckForDecommissioningNodesResponse response = Records
+        .newRecord(CheckForDecommissioningNodesResponse.class);
     response.setDecommissioningNodes(decomNodes);
     decomNodes.add(NodeId.newInstance("node1", 100));
     response.setDecommissioningNodes(decomNodes);
     when(admin.checkForDecommissioningNodes(any(
         CheckForDecommissioningNodesRequest.class))).thenReturn(response);
-    assertEquals(0, rmAdminCLI.run(focefulDecomArgs));
+    assertEquals(0, rmAdminCLI.run(forcefulDecomArgs));
     verify(admin).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+  }
+
+  @Test
+  public void testRefreshNodesGracefulInfiniteTimeout() throws Exception {
+    String[] infiniteTimeoutArgs = {"-refreshNodes", "-g", "-1", "-client"};
+    testRefreshNodesGracefulInfiniteTimeout(infiniteTimeoutArgs);
+  }
+
+  @Test
+  public void testRefreshNodesGracefulNoTimeout() throws Exception {
+    // no timeout (infinite timeout)
+    String[] noTimeoutArgs = {"-refreshNodes", "-g", "-client"};
+    testRefreshNodesGracefulInfiniteTimeout(noTimeoutArgs);
+  }
+
+  private void testRefreshNodesGracefulInfiniteTimeout(String[] args)
+      throws Exception {
+    when(admin.checkForDecommissioningNodes(any(
+        CheckForDecommissioningNodesRequest.class))).thenAnswer(
+        new Answer<CheckForDecommissioningNodesResponse>() {
+            private int count = 5;
+            @Override
+            public CheckForDecommissioningNodesResponse answer(
+                InvocationOnMock invocationOnMock) throws Throwable {
+              CheckForDecommissioningNodesResponse response = Records
+                  .newRecord(CheckForDecommissioningNodesResponse.class);
+              HashSet<NodeId> decomNodes = new HashSet<NodeId>();
+              count--;
+              if (count <= 0) {
+                response.setDecommissioningNodes(decomNodes);
+                return response;
+              } else {
+                decomNodes.add(NodeId.newInstance("node1", 100));
+                response.setDecommissioningNodes(decomNodes);
+                return response;
+              }
+            }
+          });
+    assertEquals(0, rmAdminCLI.run(args));
+    verify(admin, atLeastOnce()).refreshNodes(
+        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+    verify(admin, never()).refreshNodes(
+        RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
+  }
 
+  @Test
+  public void testRefreshNodesGracefulInvalidArgs() throws Exception {
     // invalid graceful timeout parameter
-    String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" };
+    String[] invalidArgs = {"-refreshNodes", "-ginvalid", "invalid", 
"-client"};
     assertEquals(-1, rmAdminCLI.run(invalidArgs));
 
     // invalid timeout
-    String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" };
+    String[] invalidTimeoutArgs = {"-refreshNodes", "-g", "invalid", 
"-client"};
     assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs));
 
     // negative timeout
-    String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" };
+    String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
     assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
+
+    // server tracking mode
+    String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
+    assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
+
+    // invalid tracking mode
+    String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
+    assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
   }
 
   @Test(timeout=500)
@@ -404,8 +466,8 @@ public class TestRMAdminCLI {
           .toString()
           .contains(
               "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
-              "seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" +
-              "Configuration] [-refreshUserToGroupsMappings] " +
+              "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
+              "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
               "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
               "[username]] [-addToClusterNodeLabels " +
               "<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
@@ -423,8 +485,8 @@ public class TestRMAdminCLI {
       assertTrue(dataOut
           .toString()
           .contains(
-              "-refreshNodes [-g [timeout in seconds]]: Refresh the hosts 
information at the " +
-              "ResourceManager."));
+              "-refreshNodes [-g [timeout in seconds] -client|server]: " +
+              "Refresh the hosts information at the ResourceManager."));
       assertTrue(dataOut
           .toString()
           .contains(
@@ -456,7 +518,8 @@ public class TestRMAdminCLI {
       testError(new String[] { "-help", "-refreshQueues" },
           "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodes" },
-          "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", 
dataErr, 0);
+          "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
+          "-client|server]]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodesResources" },
           "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
       testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@@ -495,7 +558,8 @@ public class TestRMAdminCLI {
       assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
       oldOutPrintStream.println(dataOut);
       String expectedHelpMsg = 
-          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in 
seconds]]] "
+          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
+              + "seconds] -client|server]] "
               + "[-refreshNodesResources] 
[-refreshSuperUserGroupsConfiguration] "
               + "[-refreshUserToGroupsMappings] "
               + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to