YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache 
when failing over. (Botong Huang via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/09999d7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/09999d7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/09999d7e

Branch: refs/heads/HDFS-12943
Commit: 09999d7e014fde717e8b122773b68664f4594106
Parents: 725b10e
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Mar 28 11:33:19 2018 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Wed Mar 28 11:33:19 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  9 ++-
 .../yarn/conf/TestYarnConfigurationFields.java  |  2 +
 .../TestFederationRMFailoverProxyProvider.java  | 81 +++++++++++++++++++-
 .../FederationRMFailoverProxyProvider.java      | 11 ++-
 4 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/09999d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 04b2898..1f62bbd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3089,15 +3089,18 @@ public class YarnConfiguration extends Configuration {
 
   public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
       FEDERATION_PREFIX + "cache-ttl.secs";
+  // 5 minutes
+  public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
+
+  public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
+      FEDERATION_PREFIX + "flush-cache-for-rm-addr";
+  public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = 
true;
 
   public static final String FEDERATION_REGISTRY_BASE_KEY =
       FEDERATION_PREFIX + "registry.base-dir";
   public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
       "yarnfederation/";
 
-  // 5 minutes
-  public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
-
   public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
       FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09999d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 9fe4f88..f4d1ac0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -79,6 +79,8 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
         .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.RM_EPOCH);
     configurationPropsToSkipCompare

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09999d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
index e3f9155..0a7ee3f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProvider
 import 
org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
@@ -49,7 +50,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -61,12 +66,20 @@ public class TestFederationRMFailoverProxyProvider {
   private FederationStateStore stateStore;
   private final String dummyCapability = "cap";
 
+  private GetClusterMetricsResponse threadResponse;
+
   @Before
   public void setUp() throws IOException, YarnException {
     conf = new YarnConfiguration();
-    stateStore = new MemoryFederationStateStore();
+
+    // Configure Facade cache to use a very long ttl
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 60 * 60);
+
+    stateStore = spy(new MemoryFederationStateStore());
     stateStore.init(conf);
     FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
+    verify(stateStore, times(0))
+        .getSubClusters(any(GetSubClustersInfoRequest.class));
   }
 
   @After
@@ -75,12 +88,25 @@ public class TestFederationRMFailoverProxyProvider {
     stateStore = null;
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testFederationRMFailoverProxyProvider() throws Exception {
+    testProxyProvider(true);
+  }
+
+  @Test (timeout=60000)
+  public void testFederationRMFailoverProxyProviderWithoutFlushFacadeCache()
+      throws Exception {
+    testProxyProvider(false);
+  }
+
+  private void testProxyProvider(boolean facadeFlushCache) throws Exception {
     final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
     final MiniYARNCluster cluster = new MiniYARNCluster(
         "testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
 
+    conf.setBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
+        facadeFlushCache);
+
     conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
@@ -104,10 +130,16 @@ public class TestFederationRMFailoverProxyProvider {
         .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
             UserGroupInformation.getCurrentUser());
 
+    verify(stateStore, times(1))
+        .getSubClusters(any(GetSubClustersInfoRequest.class));
+
     // client will retry until the rm becomes active.
     GetClusterMetricsResponse response =
         client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
 
+    verify(stateStore, times(1))
+        .getSubClusters(any(GetSubClustersInfoRequest.class));
+
     // validate response
     checkResponse(response);
 
@@ -118,7 +150,50 @@ public class TestFederationRMFailoverProxyProvider {
 
     // Transition rm2 to active;
     makeRMActive(subClusterId, cluster, 1);
-    response = 
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+
+    verify(stateStore, times(1))
+        .getSubClusters(any(GetSubClustersInfoRequest.class));
+
+    threadResponse = null;
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // In non flush cache case, we will be hitting the cache with old RM
+          // address and keep failing before the cache is flushed
+          threadResponse =
+              client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+        } catch (YarnException | IOException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    thread.start();
+
+    if (!facadeFlushCache) {
+      // Add a wait so that hopefully the thread has started hitting old cached
+      Thread.sleep(500);
+
+      // Should still be hitting cache
+      verify(stateStore, times(1))
+          .getSubClusters(any(GetSubClustersInfoRequest.class));
+
+      // Force flush cache, so that it will pick up the new RM address
+      FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
+          true);
+    }
+
+    // Wait for the thread to finish and grab result
+    thread.join();
+    response = threadResponse;
+
+    if (facadeFlushCache) {
+      verify(stateStore, atLeast(2))
+          .getSubClusters(any(GetSubClustersInfoRequest.class));
+    } else {
+      verify(stateStore, times(2))
+          .getSubClusters(any(GetSubClustersInfoRequest.class));
+    }
 
     // validate response
     checkResponse(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09999d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
index c631208..cf6d1ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
   private FederationStateStoreFacade facade;
   private SubClusterId subClusterId;
   private UserGroupInformation originalUser;
-  private boolean federationFailoverEnabled = false;
+  private boolean federationFailoverEnabled;
+  private boolean flushFacadeCacheForYarnRMAddr;
 
   @Override
   public void init(Configuration configuration, RMProxy<T> proxy,
@@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider<T>
     String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
     Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
     this.subClusterId = SubClusterId.newInstance(clusterId);
-    this.facade = facade.getInstance();
+    this.facade = FederationStateStoreFacade.getInstance();
     if (configuration instanceof YarnConfiguration) {
       this.conf = (YarnConfiguration) configuration;
     }
     federationFailoverEnabled =
         conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
             YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
+    flushFacadeCacheForYarnRMAddr =
+        conf.getBoolean(YarnConfiguration.FEDERATION_FLUSH_CACHE_FOR_RM_ADDR,
+            YarnConfiguration.DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR);
 
     conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
@@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider<T>
     try {
       LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
           subClusterId);
-      subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
+      subClusterInfo = facade.getSubCluster(subClusterId,
+          this.flushFacadeCacheForYarnRMAddr && isFailover);
       // updating the conf with the refreshed RM addresses as proxy
       // creations are based out of conf
       updateRMAddress(subClusterInfo);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to