YARN-8760. [AMRMProxy] Fix concurrent re-register due to YarnRM failover in 
AMRMClientRelayer. Contributed by Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/59d5af21
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/59d5af21
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/59d5af21

Branch: refs/heads/HDFS-13532
Commit: 59d5af21b7a8f52e8c89cbc2d25fe3d449b2657a
Parents: cc80ac2
Author: Giovanni Matteo Fumarola <gif...@apache.org>
Authored: Mon Oct 1 13:12:38 2018 -0700
Committer: Giovanni Matteo Fumarola <gif...@apache.org>
Committed: Mon Oct 1 13:12:38 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/server/AMRMClientRelayer.java   | 25 ++++++++++++++++++--
 .../yarn/server/TestAMRMClientRelayer.java      | 25 ++++++++++++++++++++
 .../amrmproxy/FederationInterceptor.java        | 12 ++++++++++
 3 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/59d5af21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
index 2621d3e..ca045d1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -237,6 +237,27 @@ public class AMRMClientRelayer extends AbstractService
     return this.rmClient.registerApplicationMaster(request);
   }
 
+  /**
+   * After an RM failover, there might be more than one
+   * allocate/finishApplicationMaster call thread (due to RPC timeout and 
retry)
+   * doing the auto re-register concurrently. As a result, we need to swallow
+   * the already register exception thrown by the new RM.
+   */
+  private void reRegisterApplicationMaster(
+      RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    try {
+      registerApplicationMaster(request);
+    } catch (InvalidApplicationMasterRequestException e) {
+      if (e.getMessage()
+          .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
+        LOG.info("Concurrent thread successfully re-registered, moving on.");
+      } else {
+        throw e;
+      }
+    }
+  }
+
   @Override
   public FinishApplicationMasterResponse finishApplicationMaster(
       FinishApplicationMasterRequest request)
@@ -247,7 +268,7 @@ public class AMRMClientRelayer extends AbstractService
       LOG.warn("Out of sync with RM " + rmId
           + " for " + this.appId + ", hence resyncing.");
       // re register with RM
-      registerApplicationMaster(this.amRegistrationRequest);
+      reRegisterApplicationMaster(this.amRegistrationRequest);
       return finishApplicationMaster(request);
     }
   }
@@ -381,7 +402,7 @@ public class AMRMClientRelayer extends AbstractService
       }
 
       // re-register with RM, then retry allocate recursively
-      registerApplicationMaster(this.amRegistrationRequest);
+      reRegisterApplicationMaster(this.amRegistrationRequest);
       // Reset responseId after re-register
       allocateRequest.setResponseId(0);
       allocateResponse = allocate(allocateRequest);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59d5af21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
index 2c016d7..fa46960 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
@@ -64,6 +64,11 @@ public class TestAMRMClientRelayer {
     // Whether this mockRM will throw failover exception upon next heartbeat
     // from AM
     private boolean failover = false;
+
+    // Whether this mockRM will throw application already registered exception
+    // upon next registerApplicationMaster call
+    private boolean throwAlreadyRegister = false;
+
     private int responseIdReset = -1;
     private List<ResourceRequest> lastAsk;
     private List<ContainerId> lastRelease;
@@ -74,6 +79,11 @@ public class TestAMRMClientRelayer {
     public RegisterApplicationMasterResponse registerApplicationMaster(
         RegisterApplicationMasterRequest request)
         throws YarnException, IOException {
+      if (this.throwAlreadyRegister) {
+        this.throwAlreadyRegister = false;
+        throw new InvalidApplicationMasterRequestException(
+            AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + "appId");
+      }
       return null;
     }
 
@@ -118,6 +128,10 @@ public class TestAMRMClientRelayer {
       this.failover = true;
     }
 
+    public void setThrowAlreadyRegister() {
+      this.throwAlreadyRegister = true;
+    }
+
     public void setResponseIdReset(int expectedResponseId) {
       this.responseIdReset = expectedResponseId;
     }
@@ -315,4 +329,15 @@ public class TestAMRMClientRelayer {
     response = this.relayer.allocate(getAllocateRequest());
     Assert.assertEquals(this.responseId + 1, response.getResponseId());
   }
+
+  @Test
+  public void testConcurrentReregister() throws YarnException, IOException {
+
+    // Set RM restart and failover flag
+    this.mockAMS.setFailoverFlag();
+
+    this.mockAMS.setThrowAlreadyRegister();
+
+    relayer.finishApplicationMaster(null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59d5af21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index c02296d..4267945 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -184,6 +184,9 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   private volatile boolean justRecovered;
 
+  /** if true, allocate will be no-op, skipping actual processing. */
+  private volatile boolean finishAMCalled;
+
   /**
    * Used to keep track of the container Id and the sub cluster RM that created
    * the container, so that we know which sub-cluster to forward later requests
@@ -230,6 +233,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     this.amRegistrationRequest = null;
     this.amRegistrationResponse = null;
     this.justRecovered = false;
+    this.finishAMCalled = false;
   }
 
   /**
@@ -576,6 +580,12 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
               + ". AM should re-register and full re-send pending requests.");
     }
 
+    if (this.finishAMCalled) {
+      LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
+          + "processing and return dummy response" + this.attemptId);
+      return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
+    }
+
     // Check responseId and handle duplicate heartbeat exactly same as RM
     synchronized (this.lastAllocateResponseLock) {
       LOG.info("Heartbeat from " + this.attemptId + " with responseId "
@@ -664,6 +674,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       FinishApplicationMasterRequest request)
       throws YarnException, IOException {
 
+    this.finishAMCalled = true;
+
     // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
     boolean failedToUnRegister = false;
     ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to