YARN-6896. Federation: routing REST invocations transparently to multiple RMs 
(part 1 - basic execution). (Contributed by Giovanni Matteo Fumarola via curino)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc59b5fb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc59b5fb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc59b5fb

Branch: refs/heads/HDFS-10467
Commit: cc59b5fb26ccf58dffcd8850fa12ec65250f127d
Parents: 0996acd
Author: Carlo Curino <cur...@apache.org>
Authored: Fri Aug 11 15:58:01 2017 -0700
Committer: Carlo Curino <cur...@apache.org>
Committed: Fri Aug 11 15:58:01 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../webapp/DefaultRequestInterceptorREST.java   |  16 +-
 .../webapp/FederationInterceptorREST.java       | 750 +++++++++++++++++++
 .../webapp/BaseRouterWebServicesTest.java       |  37 +-
 .../MockDefaultRequestInterceptorREST.java      | 136 ++++
 .../webapp/TestFederationInterceptorREST.java   | 379 ++++++++++
 .../TestFederationInterceptorRESTRetry.java     | 274 +++++++
 .../TestableFederationInterceptorREST.java      |  54 ++
 .../src/site/markdown/Federation.md             |   2 +-
 10 files changed, 1646 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index cd4d569..8acaef8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2721,6 +2721,16 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.router.webapp."
           + "DefaultRequestInterceptorREST";
 
+  /**
+   * The interceptor class used in FederationInterceptorREST to communicate 
with
+   * each SubCluster.
+   */
+  public static final String ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS =
+      ROUTER_WEBAPP_PREFIX + "default-interceptor-class";
+  public static final String DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS =
+      "org.apache.hadoop.yarn.server.router.webapp."
+          + "DefaultRequestInterceptorREST";
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index b9ad31a..91a8b0a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -81,6 +81,8 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
         .add(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.ROUTER_RMADMIN_ADDRESS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
 
     // Federation policies configs to be ignored
     configurationPropsToSkipCompare

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
index aa8e3eb..abd8ca6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
@@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
@@ -66,10 +67,23 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
  * implementation that simply forwards the client requests to the resource
  * manager.
  */
-public final class DefaultRequestInterceptorREST
+public class DefaultRequestInterceptorREST
     extends AbstractRESTRequestInterceptor {
 
   private String webAppAddress;
+  private SubClusterId subClusterId = null;
+
+  public void setWebAppAddress(String webAppAddress) {
+    this.webAppAddress = webAppAddress;
+  }
+
+  protected void setSubClusterId(SubClusterId scId) {
+    this.subClusterId = scId;
+  }
+
+  protected SubClusterId getSubClusterId() {
+    return this.subClusterId;
+  }
 
   @Override
   public void init(String user) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
new file mode 100644
index 0000000..8ecc19d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -0,0 +1,750 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
+import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
+ * implementation for federation of YARN RM and scaling an application across
+ * multiple YARN SubClusters. All the federation specific implementation is
+ * encapsulated in this class. This is always the last intercepter in the 
chain.
+ */
+public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationInterceptorREST.class);
+
+  private int numSubmitRetries;
+  private FederationStateStoreFacade federationFacade;
+  private Random rand;
+  private RouterPolicyFacade policyFacade;
+
+  private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
+
+  @Override
+  public void init(String user) {
+    federationFacade = FederationStateStoreFacade.getInstance();
+    rand = new Random(System.currentTimeMillis());
+
+    final Configuration conf = this.getConf();
+
+    try {
+      policyFacade = new RouterPolicyFacade(conf, federationFacade,
+          this.federationFacade.getSubClusterResolver(), null);
+    } catch (FederationPolicyInitializationException e) {
+      LOG.error(e.getMessage());
+    }
+
+    numSubmitRetries =
+        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+
+    interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
+  }
+
+  private SubClusterId getRandomActiveSubCluster(
+      Map<SubClusterId, SubClusterInfo> activeSubclusters,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
+
+    if (activeSubclusters == null || activeSubclusters.size() < 1) {
+      RouterServerUtil.logAndThrowException(
+          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
+    }
+    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+
+    FederationPolicyUtils.validateSubClusterAvailability(list,
+        blackListSubClusters);
+
+    if (blackListSubClusters != null) {
+
+      // Remove from the active SubClusters from StateStore the blacklisted 
ones
+      for (SubClusterId scId : blackListSubClusters) {
+        list.remove(scId);
+      }
+    }
+
+    return list.get(rand.nextInt(list.size()));
+  }
+
+  @VisibleForTesting
+  protected DefaultRequestInterceptorREST getInterceptorForSubCluster(
+      SubClusterId subClusterId) {
+    if (interceptors.containsKey(subClusterId)) {
+      return interceptors.get(subClusterId);
+    } else {
+      LOG.error("The interceptor for SubCluster " + subClusterId
+          + " does not exist in the cache.");
+      return null;
+    }
+  }
+
+  private DefaultRequestInterceptorREST createInterceptorForSubCluster(
+      SubClusterId subClusterId, String webAppAddress) {
+
+    final Configuration conf = this.getConf();
+
+    String interceptorClassName =
+        conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+            YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
+    DefaultRequestInterceptorREST interceptorInstance = null;
+    try {
+      Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
+      if (DefaultRequestInterceptorREST.class
+          .isAssignableFrom(interceptorClass)) {
+        interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils
+            .newInstance(interceptorClass, conf);
+
+      } else {
+        throw new YarnRuntimeException(
+            "Class: " + interceptorClassName + " not instance of "
+                + DefaultRequestInterceptorREST.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate ApplicationMasterRequestInterceptor: "
+              + interceptorClassName,
+          e);
+    }
+
+    interceptorInstance.setWebAppAddress(webAppAddress);
+    interceptorInstance.setSubClusterId(subClusterId);
+    interceptors.put(subClusterId, interceptorInstance);
+    return interceptorInstance;
+  }
+
+  @VisibleForTesting
+  protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
+      SubClusterId subClusterId, String webAppAddress) {
+    DefaultRequestInterceptorREST interceptor =
+        getInterceptorForSubCluster(subClusterId);
+    if (interceptor == null) {
+      interceptor = createInterceptorForSubCluster(subClusterId, 
webAppAddress);
+    }
+    return interceptor;
+  }
+
+  /**
+   * Yarn Router forwards every getNewApplication requests to any RM. During
+   * this operation there will be no communication with the State Store. The
+   * Router will forward the requests to any SubCluster. The Router will retry
+   * to submit the request on #numSubmitRetries different SubClusters. The
+   * SubClusters are randomly chosen from the active ones.
+   * <p>
+   * Possible failures and behaviors:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit.
+   * <p>
+   * ResourceManager: the Router will timeout and contacts another RM.
+   * <p>
+   * StateStore: not in the execution.
+   */
+  @Override
+  public Response createNewApplication(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    Map<SubClusterId, SubClusterInfo> subClustersActive;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+          .entity(e.getLocalizedMessage()).build();
+    }
+
+    List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+
+      SubClusterId subClusterId;
+      try {
+        subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
+      } catch (YarnException e) {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage()).build();
+      }
+
+      LOG.debug(
+          "getNewApplication try #" + i + " on SubCluster " + subClusterId);
+
+      DefaultRequestInterceptorREST interceptor =
+          getOrCreateInterceptorForSubCluster(subClusterId,
+              subClustersActive.get(subClusterId).getRMWebServiceAddress());
+      Response response = null;
+      try {
+        response = interceptor.createNewApplication(hsr);
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new ApplicationId in SubCluster "
+            + subClusterId.getId(), e);
+      }
+
+      if (response != null && response.getStatus() == 200) {
+        return response;
+      } else {
+        // Empty response from the ResourceManager.
+        // Blacklist this subcluster for this request.
+        blacklist.add(subClusterId);
+      }
+    }
+
+    String errMsg = "Fail to create a new application.";
+    LOG.error(errMsg);
+    return 
Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+  }
+
+  /**
+   * Today, in YARN there are no checks of any applicationId submitted.
+   * <p>
+   * Base scenarios:
+   * <p>
+   * The Client submits an application to the Router. • The Router selects 
one
+   * SubCluster to forward the request. • The Router inserts a tuple into
+   * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The
+   * State Store replies with the selected SubCluster (e.g. SC1). • The 
Router
+   * submits the request to the selected SubCluster.
+   * <p>
+   * In case of State Store failure:
+   * <p>
+   * The client submits an application to the Router. • The Router selects 
one
+   * SubCluster to forward the request. • The Router inserts a tuple into 
State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to 
the
+   * State Store down the Router times out and it will retry depending on the
+   * FederationFacade settings. • The Router replies to the client with an 
error
+   * message.
+   * <p>
+   * If State Store fails after inserting the tuple: identical behavior as
+   * {@code RMWebServices}.
+   * <p>
+   * In case of Router failure:
+   * <p>
+   * Scenario 1 – Crash before submission to the ResourceManager
+   * <p>
+   * The Client submits an application to the Router. • The Router selects 
one
+   * SubCluster to forward the request. • The Router inserts a tuple into 
State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The 
Router
+   * crashes. • The Client timeouts and resubmits the application. • The 
Router
+   * selects one SubCluster to forward the request. • The Router inserts a 
tuple
+   * into State Store with the selected SubCluster (e.g. SC2) and the appId. 
•
+   * Because the tuple is already inserted in the State Store, it returns the
+   * previous selected SubCluster (e.g. SC1). • The Router submits the 
request
+   * to the selected SubCluster (e.g. SC1).
+   * <p>
+   * Scenario 2 – Crash after submission to the ResourceManager
+   * <p>
+   * • The Client submits an application to the Router. • The Router 
selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into 
State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The 
Router
+   * submits the request to the selected SubCluster. • The Router crashes. 
• The
+   * Client timeouts and resubmit the application. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into 
State
+   * Store with the selected SubCluster (e.g. SC2) and the appId. • The State
+   * Store replies with the selected SubCluster (e.g. SC1). • The Router 
submits
+   * the request to the selected SubCluster (e.g. SC1). When a client 
re-submits
+   * the same application to the same RM, it does not raise an exception and
+   * replies with operation successful message.
+   * <p>
+   * In case of Client failure: identical behavior as {@code RMWebServices}.
+   * <p>
+   * In case of ResourceManager failure:
+   * <p>
+   * The Client submits an application to the Router. • The Router selects 
one
+   * SubCluster to forward the request. • The Router inserts a tuple into 
State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The 
Router
+   * submits the request to the selected SubCluster. • The entire SubCluster 
is
+   * down – all the RMs in HA or the master RM is not reachable. • The 
Router
+   * times out. • The Router selects a new SubCluster to forward the 
request. •
+   * The Router update a tuple into State Store with the selected SubCluster
+   * (e.g. SC2) and the appId. • The State Store replies with OK answer. • 
The
+   * Router submits the request to the selected SubCluster (e.g. SC2).
+   */
+  @Override
+  public Response submitApplication(ApplicationSubmissionContextInfo newApp,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    if (newApp == null || newApp.getApplicationId() == null) {
+      String errMsg = "Missing ApplicationSubmissionContextInfo or "
+          + "applicationSubmissionContex information.";
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+    }
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(newApp.getApplicationId());
+    } catch (IllegalArgumentException e) {
+      return 
Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+          .build();
+    }
+
+    List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+
+      ApplicationSubmissionContext context =
+          RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
+
+      SubClusterId subClusterId = null;
+      try {
+        subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
+      } catch (YarnException e) {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage()).build();
+      }
+      LOG.info("submitApplication appId" + applicationId + " try #" + i
+          + " on SubCluster " + subClusterId);
+
+      ApplicationHomeSubCluster appHomeSubCluster =
+          ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+
+      if (i == 0) {
+        try {
+          // persist the mapping of applicationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId =
+              federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
+        } catch (YarnException e) {
+          String errMsg = "Unable to insert the ApplicationId " + applicationId
+              + " into the FederationStateStore";
+          return Response.status(Status.SERVICE_UNAVAILABLE)
+              .entity(errMsg + " " + e.getLocalizedMessage()).build();
+        }
+      } else {
+        try {
+          // update the mapping of applicationId and the home subClusterId to
+          // the new subClusterId we have selected
+          federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
+        } catch (YarnException e) {
+          String errMsg = "Unable to update the ApplicationId " + applicationId
+              + " into the FederationStateStore";
+          SubClusterId subClusterIdInStateStore;
+          try {
+            subClusterIdInStateStore =
+                federationFacade.getApplicationHomeSubCluster(applicationId);
+          } catch (YarnException e1) {
+            return Response.status(Status.SERVICE_UNAVAILABLE)
+                .entity(e1.getLocalizedMessage()).build();
+          }
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Application " + applicationId
+                + " already submitted on SubCluster " + subClusterId);
+          } else {
+            return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
+                .build();
+          }
+        }
+      }
+
+      SubClusterInfo subClusterInfo;
+      try {
+        subClusterInfo = federationFacade.getSubCluster(subClusterId);
+      } catch (YarnException e) {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage()).build();
+      }
+
+      Response response = null;
+      try {
+        response = getOrCreateInterceptorForSubCluster(subClusterId,
+            subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
+                hsr);
+      } catch (Exception e) {
+        LOG.warn("Unable to submit the application " + applicationId
+            + "to SubCluster " + subClusterId.getId(), e);
+      }
+
+      if (response != null && response.getStatus() == 202) {
+        LOG.info("Application " + context.getApplicationName() + " with appId "
+            + applicationId + " submitted on " + subClusterId);
+        return response;
+      } else {
+        // Empty response from the ResourceManager.
+        // Blacklist this subcluster for this request.
+        blacklist.add(subClusterId);
+      }
+    }
+
+    String errMsg = "Application " + newApp.getApplicationName()
+        + " with appId " + applicationId + " failed to be submitted.";
+    LOG.error(errMsg);
+    return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
+  }
+
+  /**
+   * The Yarn Router will forward to the respective Yarn RM in which the AM is
+   * running.
+   * <p>
+   * Possible failure:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router will timeout and the call will fail.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public AppInfo getApp(HttpServletRequest hsr, String appId,
+      Set<String> unselectedFields) {
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(appId);
+    } catch (IllegalArgumentException e) {
+      return null;
+    }
+
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId =
+          federationFacade.getApplicationHomeSubCluster(applicationId);
+      if (subClusterId == null) {
+        return null;
+      }
+      subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    } catch (YarnException e) {
+      return null;
+    }
+
+    return getOrCreateInterceptorForSubCluster(subClusterId,
+        subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
+            unselectedFields);
+  }
+
+  /**
+   * The Yarn Router will forward to the respective Yarn RM in which the AM is
+   * running.
+   * <p>
+   * Possible failures and behaviors:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router will timeout and the call will fail.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public Response updateAppState(AppState targetState, HttpServletRequest hsr,
+      String appId) throws AuthorizationException, YarnException,
+      InterruptedException, IOException {
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(appId);
+    } catch (IllegalArgumentException e) {
+      return 
Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+          .build();
+    }
+
+    SubClusterId subClusterId =
+        federationFacade.getApplicationHomeSubCluster(applicationId);
+
+    SubClusterInfo subClusterInfo =
+        federationFacade.getSubCluster(subClusterId);
+
+    return getOrCreateInterceptorForSubCluster(subClusterId,
+        subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
+            hsr, appId);
+  }
+
+  @Override
+  public ClusterInfo get() {
+    return getClusterInfo();
+  }
+
+  @Override
+  public ClusterInfo getClusterInfo() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ClusterMetricsInfo getClusterMetricsInfo() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public SchedulerTypeInfo getSchedulerInfo() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodesInfo getNodes(String states) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeInfo getNode(String nodeId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
+      String appId, String time) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
+      Set<String> stateQueries, Set<String> typeQueries) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
+      Set<String> statesQuery, String finalStatusQuery, String userQuery,
+      String queueQuery, String count, String startedBegin, String startedEnd,
+      String finishBegin, String finishEnd, Set<String> applicationTypes,
+      Set<String> applicationTags, Set<String> unselectedFields) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppState getAppState(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
+      HttpServletRequest hsr) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
+      HttpServletRequest hsr, String nodeId) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
+      HttpServletRequest hsr) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
+      HttpServletRequest hsr) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateApplicationPriority(AppPriority targetPriority,
+      HttpServletRequest hsr, String appId) throws AuthorizationException,
+      YarnException, InterruptedException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
+      String appId) throws AuthorizationException, YarnException,
+      InterruptedException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response postDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr) throws AuthorizationException, IOException,
+      InterruptedException, Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response postDelegationTokenExpiration(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response cancelDelegationToken(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response createNewReservation(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response submitReservation(ReservationSubmissionRequestInfo 
resContext,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateReservation(ReservationUpdateRequestInfo resContext,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response deleteReservation(ReservationDeleteRequestInfo resContext,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response listReservation(String queue, String reservationId,
+      long startTime, long endTime, boolean includeResourceAllocations,
+      HttpServletRequest hsr) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
+      String type) throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
+      HttpServletRequest hsr, String appId) throws AuthorizationException,
+      YarnException, InterruptedException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppAttemptInfo getAppAttempt(HttpServletRequest req,
+      HttpServletResponse res, String appId, String appAttemptId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ContainersInfo getContainers(HttpServletRequest req,
+      HttpServletResponse res, String appId, String appAttemptId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ContainerInfo getContainer(HttpServletRequest req,
+      HttpServletResponse res, String appId, String appAttemptId,
+      String containerId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void setNextInterceptor(RESTRequestInterceptor next) {
+    throw new YarnRuntimeException("setNextInterceptor is being called on "
+        + "FederationInterceptorREST, which should be the last one "
+        + "in the chain. Check if the interceptor pipeline configuration "
+        + "is correct");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
index 223690f..7d42084 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
@@ -27,6 +27,7 @@ import java.security.PrivilegedExceptionAction;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -75,30 +76,34 @@ public abstract class BaseRouterWebServicesTest {
   private RouterWebServices routerWebService;
 
   @Before
-  public void setup() {
-    conf = new YarnConfiguration();
+  public void setUp() {
+    this.conf = createConfiguration();
 
+    router = spy(new Router());
+    Mockito.doNothing().when(router).startWepApp();
+    routerWebService = new RouterWebServices(router, conf);
+    routerWebService.setResponse(mock(HttpServletResponse.class));
+
+    router.init(conf);
+    router.start();
+  }
+
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration config = new YarnConfiguration();
     String mockPassThroughInterceptorClass =
         PassThroughRESTRequestInterceptor.class.getName();
 
     // Create a request intercepter pipeline for testing. The last one in the
     // chain will call the mock resource manager. The others in the chain will
     // simply forward it to the next one in the chain
-    conf.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
+    config.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
         mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
             + "," + mockPassThroughInterceptorClass + ","
             + MockRESTRequestInterceptor.class.getName());
 
-    conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+    config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
         TEST_MAX_CACHE_SIZE);
-
-    router = spy(new Router());
-    Mockito.doNothing().when(router).startWepApp();
-    routerWebService = new RouterWebServices(router, conf);
-    routerWebService.setResponse(mock(HttpServletResponse.class));
-
-    router.init(conf);
-    router.start();
+    return config;
   }
 
   @After
@@ -108,6 +113,14 @@ public abstract class BaseRouterWebServicesTest {
     }
   }
 
+  public void setUpConfig() {
+    this.conf = createConfiguration();
+  }
+
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
   protected RouterWebServices getRouterWebServices() {
     Assert.assertNotNull(this.routerWebService);
     return this.routerWebService;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
new file mode 100644
index 0000000..91e601e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class mocks the RESTRequestInterceptor.
+ */
+public class MockDefaultRequestInterceptorREST
+    extends DefaultRequestInterceptorREST {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MockDefaultRequestInterceptorREST.class);
+  final private AtomicInteger applicationCounter = new AtomicInteger(0);
+  // True if the Mock RM is running, false otherwise.
+  // This property allows us to write tests for specific scenario as Yarn RM
+  // down e.g. network issue, failover.
+  private boolean isRunning = true;
+  private HashSet<ApplicationId> applicationMap = new HashSet<>();
+
+  private void validateRunning() throws ConnectException {
+    if (!isRunning) {
+      throw new ConnectException("RM is stopped");
+    }
+  }
+
+  @Override
+  public Response createNewApplication(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    validateRunning();
+
+    ApplicationId applicationId =
+        ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
+            applicationCounter.incrementAndGet());
+    NewApplication appId =
+        new NewApplication(applicationId.toString(), new ResourceInfo());
+    return Response.status(Status.OK).entity(appId).build();
+  }
+
+  @Override
+  public Response submitApplication(ApplicationSubmissionContextInfo newApp,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    validateRunning();
+
+    ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
+    LOG.info("Application submitted: " + appId);
+    applicationMap.add(appId);
+    return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
+        .entity(getSubClusterId()).build();
+  }
+
+  @Override
+  public AppInfo getApp(HttpServletRequest hsr, String appId,
+      Set<String> unselectedFields) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.contains(applicationId)) {
+      throw new NotFoundException("app with id: " + appId + " not found");
+    }
+
+    return new AppInfo();
+  }
+
+  @Override
+  public Response updateAppState(AppState targetState, HttpServletRequest hsr,
+      String appId) throws AuthorizationException, YarnException,
+      InterruptedException, IOException {
+    validateRunning();
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.remove(applicationId)) {
+      throw new ApplicationNotFoundException(
+          "Trying to kill an absent application: " + appId);
+    }
+
+    if (targetState == null) {
+      return Response.status(Status.BAD_REQUEST).build();
+    }
+
+    LOG.info("Force killing application: " + appId);
+    AppState ret = new AppState();
+    ret.setState(targetState.toString());
+    return Response.status(Status.OK).entity(ret).build();
+  }
+
+  public void setSubClusterId(int subClusterId) {
+    setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
+  }
+
+  public boolean isRunning() {
+    return isRunning;
+  }
+
+  public void setRunning(boolean runningMode) {
+    this.isRunning = runningMode;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
new file mode 100644
index 0000000..d918149
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code FederationInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request intercepter chains.
+ */
+public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationInterceptorREST.class);
+  private final static int NUM_SUBCLUSTER = 4;
+  private static final int BAD_REQUEST = 400;
+  private static final int ACCEPTED = 202;
+  private static String user = "test-user";
+  private TestableFederationInterceptorREST interceptor;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+  private List<SubClusterId> subClusters;
+
+  @Override
+  public void setUp() {
+    super.setUpConfig();
+    interceptor = new TestableFederationInterceptorREST();
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        this.getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    interceptor.setConf(this.getConf());
+    interceptor.init(user);
+
+    subClusters = new ArrayList<>();
+
+    try {
+      for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+        stateStoreUtil.registerSubCluster(sc);
+        subClusters.add(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+        MockDefaultRequestInterceptorREST.class.getName());
+    String mockPassThroughInterceptorClass =
+        PassThroughRESTRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource 
manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + TestableFederationInterceptorREST.class.getName());
+
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    return conf;
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication. The return
+   * ApplicationId has to belong to one of the SubCluster in the cluster.
+   */
+  @Test
+  public void testGetNewApplication()
+      throws YarnException, IOException, InterruptedException {
+
+    Response response = interceptor.createNewApplication(null);
+
+    Assert.assertNotNull(response);
+    NewApplication ci = (NewApplication) response.getEntity();
+    Assert.assertNotNull(ci);
+    ApplicationId appId = ApplicationId.fromString(ci.getApplicationId());
+    Assert.assertTrue(appId.getClusterTimestamp() < NUM_SUBCLUSTER);
+    Assert.assertTrue(appId.getClusterTimestamp() >= 0);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication. The application
+   * has to be submitted to one of the SubCluster in the cluster.
+   */
+  @Test
+  public void testSubmitApplication()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+    SubClusterId ci = (SubClusterId) response.getEntity();
+
+    Assert.assertNotNull(response);
+    SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult);
+    Assert.assertTrue(subClusters.contains(scIdResult));
+    Assert.assertEquals(ci, scIdResult);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of
+   * multiple submission. The first retry has to be submitted to the same
+   * SubCluster of the first attempt.
+   */
+  @Test
+  public void testSubmitApplicationMultipleSubmission()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // First attempt
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+
+    SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult);
+
+    // First retry
+    response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+    SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult2);
+    Assert.assertEquals(scIdResult, scIdResult2);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of empty
+   * request.
+   */
+  @Test
+  public void testSubmitApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+
+    // ApplicationSubmissionContextInfo null
+    Response response = interceptor.submitApplication(null, null);
+
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
+    // ApplicationSubmissionContextInfo empty
+    response = interceptor
+        .submitApplication(new ApplicationSubmissionContextInfo(), null);
+
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of of
+   * application in wrong format.
+   */
+  @Test
+  public void testSubmitApplicationWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId("Application_wrong_id");
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testForceKillApplication()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we are going to kill later
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    AppState appState = new AppState("KILLED");
+
+    Response responseKill =
+        interceptor.updateAppState(appState, null, appId.toString());
+    Assert.assertNotNull(responseKill);
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testForceKillApplicationNotExists()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    AppState appState = new AppState("KILLED");
+    try {
+      interceptor.updateAppState(appState, null, appId.toString());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().equals("Application " + appId + " does not exist"));
+    }
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * application in wrong format.
+   */
+  @Test
+  public void testForceKillApplicationWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    AppState appState = new AppState("KILLED");
+    Response response =
+        interceptor.updateAppState(appState, null, "Application_wrong_id");
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * empty request.
+   */
+  @Test
+  public void testForceKillApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we are going to kill later
+    interceptor.submitApplication(context, null);
+
+    Response response =
+        interceptor.updateAppState(null, null, appId.toString());
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testGetApplicationReport()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we want the report later
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    AppInfo responseGet = interceptor.getApp(null, appId.toString(), null);
+
+    Assert.assertNotNull(responseGet);
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case the
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testGetApplicationNotExists()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    AppInfo response = interceptor.getApp(null, appId.toString(), null);
+
+    Assert.assertNull(response);
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case of
+   * application in wrong format.
+   */
+  @Test
+  public void testGetApplicationWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    AppInfo response = interceptor.getApp(null, "Application_wrong_id", null);
+
+    Assert.assertNull(response);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
new file mode 100644
index 0000000..48bc1a8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import 
org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import 
org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
+import 
org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterWebServicesTest} and overrides methods in order
+ * to use the {@code RouterWebServices} pipeline test cases for testing the
+ * {@code FederationInterceptorREST} class. The tests for
+ * {@code RouterWebServices} has been written cleverly so that it can be reused
+ * to validate different request interceptor chains.
+ * <p>
+ * It tests the case with SubClusters down and the Router logic of retries. We
+ * have 1 good SubCluster and 2 bad ones for all the tests.
+ */
+public class TestFederationInterceptorRESTRetry
+    extends BaseRouterWebServicesTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationInterceptorRESTRetry.class);
+  private static final int SERVICE_UNAVAILABLE = 503;
+  private static final int ACCEPTED = 202;
+  private static final int OK = 200;
+  // running and registered
+  private static SubClusterId good;
+  // registered but not running
+  private static SubClusterId bad1;
+  private static SubClusterId bad2;
+  private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
+  private TestableFederationInterceptorREST interceptor;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+  private String user = "test-user";
+
+  @Override
+  public void setUp() {
+    super.setUpConfig();
+    interceptor = new TestableFederationInterceptorREST();
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    interceptor.setConf(this.getConf());
+    interceptor.init(user);
+
+    // Create SubClusters
+    good = SubClusterId.newInstance("0");
+    bad1 = SubClusterId.newInstance("1");
+    bad2 = SubClusterId.newInstance("2");
+    scs.add(good);
+    scs.add(bad1);
+    scs.add(bad2);
+
+    // The mock RM will not start in these SubClusters, this is done to 
simulate
+    // a SubCluster down
+
+    interceptor.registerBadSubCluster(bad1);
+    interceptor.registerBadSubCluster(bad2);
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  private void setupCluster(List<SubClusterId> scsToRegister)
+      throws YarnException {
+
+    try {
+      // Clean up the StateStore before every test
+      stateStoreUtil.deregisterAllSubClusters();
+
+      for (SubClusterId sc : scsToRegister) {
+        stateStoreUtil.registerSubCluster(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+
+    conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+        MockDefaultRequestInterceptorREST.class.getName());
+
+    String mockPassThroughInterceptorClass =
+        PassThroughClientRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource 
manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + TestableFederationClientInterceptor.class.getName());
+
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    return conf;
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testGetNewApplicationOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+
+    Response response = interceptor.createNewApplication(null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testGetNewApplicationTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    Response response = interceptor.createNewApplication(null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 1 bad SubCluster and 1 good one.
+   */
+  @Test
+  public void testGetNewApplicationOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test getNewApplication with one bad, one good SC");
+    setupCluster(Arrays.asList(good, bad2));
+    Response response = interceptor.createNewApplication(null);
+
+    Assert.assertEquals(OK, response.getStatus());
+
+    NewApplication newApp = (NewApplication) response.getEntity();
+    ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
+
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        appId.getClusterTimestamp());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testSubmitApplicationOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testSubmitApplicationTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 1 bad SubCluster and a good one.
+   */
+  @Test
+  public void testSubmitApplicationOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test submitApplication with one bad, one good SC");
+    setupCluster(Arrays.asList(good, bad2));
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+
+    Assert.assertEquals(good,
+        stateStore
+            .getApplicationHomeSubCluster(
+                GetApplicationHomeSubClusterRequest.newInstance(appId))
+            .getApplicationHomeSubCluster().getHomeSubCluster());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
new file mode 100644
index 0000000..ce5bb23
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+/**
+ * Extends the FederationInterceptorREST and overrides methods to provide a
+ * testable implementation of FederationInterceptorREST.
+ */
+public class TestableFederationInterceptorREST
+    extends FederationInterceptorREST {
+
+  private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
+
+  /**
+   * For testing purpose, some subclusters has to be down to simulate 
particular
+   * scenarios as RM Failover, network issues. For this reason we keep track of
+   * these bad subclusters. This method make the subcluster unusable.
+   *
+   * @param badSC the subcluster to make unusable
+   */
+  protected void registerBadSubCluster(SubClusterId badSC) {
+
+    // Adding in the cache the bad SubCluster, in this way we can stop them
+    getOrCreateInterceptorForSubCluster(badSC, "test");
+
+    badSubCluster.add(badSC);
+    MockDefaultRequestInterceptorREST interceptor =
+        (MockDefaultRequestInterceptorREST) super.getInterceptorForSubCluster(
+            badSC);
+    interceptor.setRunning(false);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc59b5fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index ecf61c5..3e3580c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -216,7 +216,7 @@ Optional:
 |`yarn.router.submit.retry` | `3` | The number of retries in the router before 
we give up. |
 |`yarn.federation.statestore.max-connections` | `10` | This is the maximum 
number of parallel connections each Router makes to the state-store. |
 |`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and 
this is the time to leave before the cache is invalidated. |
-
+|`yarn.router.webapp.interceptor-class.pipeline` | 
`org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A 
comma-seperated list of interceptor classes to be run at the router when 
interfacing with the client via REST interface. The last step of this pipeline 
must be the Federation Interceptor REST. |
 
 ###ON NMs:
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to