Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 bb3c07fa3 -> df3ff9042


HDFS-13364. RBF: Support NamenodeProtocol in the Router. Contributed by Inigo 
Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2be64eb2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2be64eb2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2be64eb2

Branch: refs/heads/HDFS-7240
Commit: 2be64eb201134502a92f7239bef8aa780771ca0b
Parents: 1077392
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Apr 3 15:08:40 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Apr 3 15:08:40 2018 +0800

----------------------------------------------------------------------
 .../federation/router/ConnectionContext.java    |  35 +++-
 .../federation/router/ConnectionManager.java    |  10 +-
 .../federation/router/ConnectionPool.java       |  98 +++++++++-
 .../federation/router/ConnectionPoolId.java     |  19 +-
 .../server/federation/router/RemoteMethod.java  |  68 ++++++-
 .../router/RouterNamenodeProtocol.java          | 187 +++++++++++++++++++
 .../federation/router/RouterRpcClient.java      |  56 ++++--
 .../federation/router/RouterRpcServer.java      | 111 ++++++++++-
 .../server/federation/MiniRouterDFSCluster.java |   8 +
 .../router/TestConnectionManager.java           |  56 +++++-
 .../server/federation/router/TestRouterRpc.java | 115 ++++++++++--
 11 files changed, 698 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
index 1d27b51..7e779b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import java.net.InetSocketAddress;
+
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.ipc.RPC;
 
 /**
@@ -26,18 +27,24 @@ import org.apache.hadoop.ipc.RPC;
  * a connection, it increments a counter to mark it as active. Once the client
  * is done with the connection, it decreases the counter. It also takes care of
  * closing the connection once is not active.
+ *
+ * The protocols currently used are:
+ * <ul>
+ * <li>{@link org.apache.hadoop.hdfs.protocol.ClientProtocol}
+ * <li>{@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol}
+ * </ul>
  */
 public class ConnectionContext {
 
   /** Client for the connection. */
-  private final ProxyAndInfo<ClientProtocol> client;
+  private final ProxyAndInfo<?> client;
   /** How many threads are using this connection. */
   private int numThreads = 0;
   /** If the connection is closed. */
   private boolean closed = false;
 
 
-  public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
+  public ConnectionContext(ProxyAndInfo<?> connection) {
     this.client = connection;
   }
 
@@ -74,7 +81,7 @@ public class ConnectionContext {
    *
    * @return Connection client.
    */
-  public synchronized ProxyAndInfo<ClientProtocol> getClient() {
+  public synchronized ProxyAndInfo<?> getClient() {
     this.numThreads++;
     return this.client;
   }
@@ -96,9 +103,27 @@ public class ConnectionContext {
   public synchronized void close() {
     this.closed = true;
     if (this.numThreads == 0) {
-      ClientProtocol proxy = this.client.getProxy();
+      Object proxy = this.client.getProxy();
       // Nobody should be using this anymore so it should close right away
       RPC.stopProxy(proxy);
     }
   }
+
+  @Override
+  public String toString() {
+    InetSocketAddress addr = this.client.getAddress();
+    Object proxy = this.client.getProxy();
+    Class<?> clazz = proxy.getClass();
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(clazz.getSimpleName());
+    sb.append("@");
+    sb.append(addr);
+    sb.append("x");
+    sb.append(numThreads);
+    if (closed) {
+      sb.append("[CLOSED]");
+    }
+    return sb.toString();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index 97c6403..0b50845 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -166,11 +166,12 @@ public class ConnectionManager {
    *
    * @param ugi User group information.
    * @param nnAddress Namenode address for the connection.
+   * @param protocol Protocol for the connection.
    * @return Proxy client to connect to nnId as UGI.
    * @throws IOException If the connection cannot be obtained.
    */
-  public ConnectionContext getConnection(
-      UserGroupInformation ugi, String nnAddress) throws IOException {
+  public ConnectionContext getConnection(UserGroupInformation ugi,
+      String nnAddress, Class<?> protocol) throws IOException {
 
     // Check if the manager is shutdown
     if (!this.running) {
@@ -181,7 +182,8 @@ public class ConnectionManager {
     }
 
     // Try to get the pool if created
-    ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
+    ConnectionPoolId connectionId =
+        new ConnectionPoolId(ugi, nnAddress, protocol);
     ConnectionPool pool = null;
     readLock.lock();
     try {
@@ -197,7 +199,7 @@ public class ConnectionManager {
         pool = this.pools.get(connectionId);
         if (pool == null) {
           pool = new ConnectionPool(
-              this.conf, nnAddress, ugi, this.minSize, this.maxSize);
+              this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol);
           this.pools.put(connectionId, pool);
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index 5af8a86..6b416dd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryUtils;
@@ -75,6 +78,8 @@ public class ConnectionPool {
   private final String namenodeAddress;
   /** User for this connections. */
   private final UserGroupInformation ugi;
+  /** Class of the protocol. */
+  private final Class<?> protocol;
 
   /** Pool of connections. We mimic a COW array. */
   private volatile List<ConnectionContext> connections = new ArrayList<>();
@@ -91,16 +96,17 @@ public class ConnectionPool {
 
 
   protected ConnectionPool(Configuration config, String address,
-      UserGroupInformation user, int minPoolSize, int maxPoolSize)
-          throws IOException {
+      UserGroupInformation user, int minPoolSize, int maxPoolSize,
+      Class<?> proto) throws IOException {
 
     this.conf = config;
 
     // Connection pool target
     this.ugi = user;
     this.namenodeAddress = address;
+    this.protocol = proto;
     this.connectionPoolId =
-        new ConnectionPoolId(this.ugi, this.namenodeAddress);
+        new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol);
 
     // Set configuration parameters for the pool
     this.minSize = minPoolSize;
@@ -287,7 +293,8 @@ public class ConnectionPool {
    * @throws IOException
    */
   public ConnectionContext newConnection() throws IOException {
-    return newConnection(this.conf, this.namenodeAddress, this.ugi);
+    return newConnection(
+        this.conf, this.namenodeAddress, this.ugi, this.protocol);
   }
 
   /**
@@ -299,12 +306,46 @@ public class ConnectionPool {
    * @param conf Configuration for the connection.
    * @param nnAddress Address of server supporting the ClientProtocol.
    * @param ugi User context.
-   * @return Proxy for the target ClientProtocol that contains the user's
+   * @param proto Interface of the protocol.
+   * @return proto for the target ClientProtocol that contains the user's
    *         security context.
    * @throws IOException If it cannot be created.
    */
   protected static ConnectionContext newConnection(Configuration conf,
-      String nnAddress, UserGroupInformation ugi)
+      String nnAddress, UserGroupInformation ugi, Class<?> proto)
+          throws IOException {
+    ConnectionContext ret;
+    if (proto == ClientProtocol.class) {
+      ret = newClientConnection(conf, nnAddress, ugi);
+    } else if (proto == NamenodeProtocol.class) {
+      ret = newNamenodeConnection(conf, nnAddress, ugi);
+    } else {
+      String msg = "Unsupported protocol for connection to NameNode: " +
+          ((proto != null) ? proto.getClass().getName() : "null");
+      LOG.error(msg);
+      throw new IllegalStateException(msg);
+    }
+    return ret;
+  }
+
+  /**
+   * Creates a proxy wrapper for a client NN connection. Each proxy contains
+   * context for a single user/security context. To maximize throughput it is
+   * recommended to use multiple connection per user+server, allowing multiple
+   * writes and reads to be dispatched in parallel.
+   *
+   * Mostly based on NameNodeProxies#createNonHAProxy() but it needs the
+   * connection identifier.
+   *
+   * @param conf Configuration for the connection.
+   * @param nnAddress Address of server supporting the ClientProtocol.
+   * @param ugi User context.
+   * @return Proxy for the target ClientProtocol that contains the user's
+   *         security context.
+   * @throws IOException If it cannot be created.
+   */
+  private static ConnectionContext newClientConnection(
+      Configuration conf, String nnAddress, UserGroupInformation ugi)
           throws IOException {
     RPC.setProtocolEngine(
         conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
@@ -334,4 +375,49 @@ public class ConnectionPool {
     ConnectionContext connection = new ConnectionContext(clientProxy);
     return connection;
   }
+
+  /**
+   * Creates a proxy wrapper for a NN connection. Each proxy contains context
+   * for a single user/security context. To maximize throughput it is
+   * recommended to use multiple connection per user+server, allowing multiple
+   * writes and reads to be dispatched in parallel.
+   *
+   * @param conf Configuration for the connection.
+   * @param nnAddress Address of server supporting the ClientProtocol.
+   * @param ugi User context.
+   * @return Proxy for the target NamenodeProtocol that contains the user's
+   *         security context.
+   * @throws IOException If it cannot be created.
+   */
+  private static ConnectionContext newNamenodeConnection(
+      Configuration conf, String nnAddress, UserGroupInformation ugi)
+          throws IOException {
+    RPC.setProtocolEngine(
+        conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+    final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
+        conf,
+        HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
+        HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
+        HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
+        HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
+        HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
+
+    SocketFactory factory = SocketFactory.getDefault();
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SaslRpcServer.init(conf);
+    }
+    InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
+    final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class);
+    NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class,
+        version, socket, ugi, conf,
+        factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+    NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy);
+    Text dtService = SecurityUtil.buildTokenService(socket);
+
+    ProxyAndInfo<NamenodeProtocol> clientProxy =
+        new ProxyAndInfo<NamenodeProtocol>(client, dtService, socket);
+    ConnectionContext connection = new ConnectionContext(clientProxy);
+    return connection;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
index 6e1ee9a..458fec2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
@@ -42,16 +42,21 @@ public class ConnectionPoolId implements 
Comparable<ConnectionPoolId> {
   private final String nnId;
   /** Information about the user. */
   private final UserGroupInformation ugi;
+  /** Protocol for the connection. */
+  private final Class<?> protocol;
 
   /**
    * New connection pool identifier.
    *
    * @param ugi Information of the user issuing the request.
    * @param nnId Namenode address with port.
+   * @param proto Protocol of the connection.
    */
-  public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
+  public ConnectionPoolId(final UserGroupInformation ugi, final String nnId,
+      final Class<?> proto) {
     this.nnId = nnId;
     this.ugi = ugi;
+    this.protocol = proto;
   }
 
   @Override
@@ -60,6 +65,7 @@ public class ConnectionPoolId implements 
Comparable<ConnectionPoolId> {
         .append(this.nnId)
         .append(this.ugi.toString())
         .append(this.getTokenIds())
+        .append(this.protocol)
         .toHashCode();
     return hash;
   }
@@ -76,14 +82,18 @@ public class ConnectionPoolId implements 
Comparable<ConnectionPoolId> {
       }
       String thisTokens = this.getTokenIds().toString();
       String otherTokens = other.getTokenIds().toString();
-      return thisTokens.equals(otherTokens);
+      if (!thisTokens.equals(otherTokens)) {
+        return false;
+      }
+      return this.protocol.equals(other.protocol);
     }
     return false;
   }
 
   @Override
   public String toString() {
-    return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
+    return this.ugi + " " + this.getTokenIds() + "->" + this.nnId + " [" +
+        this.protocol.getSimpleName() + "]";
   }
 
   @Override
@@ -97,6 +107,9 @@ public class ConnectionPoolId implements 
Comparable<ConnectionPoolId> {
       String otherTokens = other.getTokenIds().toString();
       ret = thisTokens.compareTo(otherTokens);
     }
+    if (ret == 0) {
+      ret = this.protocol.toString().compareTo(other.protocol.toString());
+    }
     return ret;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
index 7978105..6ff2b01 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -38,22 +38,35 @@ public class RemoteMethod {
   private final Object[] params;
   /** List of method parameters types, matches parameters. */
   private final Class<?>[] types;
+  /** Class of the protocol for the method. */
+  private final Class<?> protocol;
   /** String name of the ClientProtocol method. */
   private final String methodName;
 
   /**
+   * Create a remote method generator for the ClientProtocol with no 
parameters.
+   *
+   * @param method The string name of the protocol method.
+   */
+  public RemoteMethod(String method) {
+    this(ClientProtocol.class, method);
+  }
+
+  /**
    * Create a method with no parameters.
    *
+   * @param proto Protocol of the method.
    * @param method The string name of the ClientProtocol method.
    */
-  public RemoteMethod(String method) {
+  public RemoteMethod(Class<?> proto, String method) {
     this.params = null;
     this.types = null;
     this.methodName = method;
+    this.protocol = proto;
   }
 
   /**
-   * Creates a remote method generator.
+   * Create a remote method generator for the ClientProtocol.
    *
    * @param method The string name of the ClientProtocol method.
    * @param pTypes A list of types to use to locate the specific method.
@@ -70,17 +83,50 @@ public class RemoteMethod {
    */
   public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
       throws IOException {
+    this(ClientProtocol.class, method, pTypes, pParams);
+  }
+
+  /**
+   * Creates a remote method generator.
+   *
+   * @param proto Protocol of the method.
+   * @param method The string name of the ClientProtocol method.
+   * @param pTypes A list of types to use to locate the specific method.
+   * @param pParams A list of parameters for the method. The order of the
+   *          parameter list must match the order and number of the types.
+   *          Parameters are grouped into 2 categories:
+   *          <ul>
+   *          <li>Static parameters that are immutable across locations.
+   *          <li>Dynamic parameters that are determined for each location by a
+   *          RemoteParam object. To specify a dynamic parameter, pass an
+   *          instance of RemoteParam in place of the parameter value.
+   *          </ul>
+   * @throws IOException If the types and parameter lists are not valid.
+   */
+  public RemoteMethod(Class<?> proto, String method, Class<?>[] pTypes,
+      Object... pParams) throws IOException {
 
     if (pParams.length != pTypes.length) {
       throw new IOException("Invalid parameters for method " + method);
     }
 
+    this.protocol = proto;
     this.params = pParams;
     this.types = Arrays.copyOf(pTypes, pTypes.length);
     this.methodName = method;
   }
 
   /**
+   * Get the interface/protocol for this method. For example, ClientProtocol or
+   * NamenodeProtocol.
+   *
+   * @return Protocol for this method.
+   */
+  public Class<?> getProtocol() {
+    return this.protocol;
+  }
+
+  /**
    * Get the represented java method.
    *
    * @return Method
@@ -89,18 +135,18 @@ public class RemoteMethod {
   public Method getMethod() throws IOException {
     try {
       if (types != null) {
-        return ClientProtocol.class.getDeclaredMethod(methodName, types);
+        return protocol.getDeclaredMethod(methodName, types);
       } else {
-        return ClientProtocol.class.getDeclaredMethod(methodName);
+        return protocol.getDeclaredMethod(methodName);
       }
     } catch (NoSuchMethodException e) {
       // Re-throw as an IOException
-      LOG.error("Cannot get method {} with types {}",
-          methodName, Arrays.toString(types), e);
+      LOG.error("Cannot get method {} with types {} from {}",
+          methodName, Arrays.toString(types), protocol.getSimpleName(), e);
       throw new IOException(e);
     } catch (SecurityException e) {
-      LOG.error("Cannot access method {} with types {}",
-          methodName, Arrays.toString(types), e);
+      LOG.error("Cannot access method {} with types {} from {}",
+          methodName, Arrays.toString(types), protocol.getSimpleName(), e);
       throw new IOException(e);
     }
   }
@@ -161,4 +207,10 @@ public class RemoteMethod {
     }
     return objList;
   }
+
+  @Override
+  public String toString() {
+    return this.protocol.getSimpleName() + "#" + this.methodName + " " +
+        Arrays.toString(this.params);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
new file mode 100644
index 0000000..0433650
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+/**
+ * Module that implements all the RPC calls in {@link NamenodeProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterNamenodeProtocol implements NamenodeProtocol {
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Interface to map global name space to HDFS subcluster name spaces. */
+  private final FileSubclusterResolver subclusterResolver;
+
+
+  public RouterNamenodeProtocol(RouterRpcServer server) {
+    this.rpcServer = server;
+    this.rpcClient =  this.rpcServer.getRPCClient();
+    this.subclusterResolver = this.rpcServer.getSubclusterResolver();
+  }
+
+  @Override
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
+      long minBlockSize) throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ);
+
+    // Get the namespace where the datanode is located
+    Map<String, DatanodeStorageReport[]> map =
+        rpcServer.getDatanodeStorageReportMap(DatanodeReportType.ALL);
+    String nsId = null;
+    for (Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
+      DatanodeStorageReport[] dns = entry.getValue();
+      for (DatanodeStorageReport dn : dns) {
+        DatanodeInfo dnInfo = dn.getDatanodeInfo();
+        if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
+          nsId = entry.getKey();
+          break;
+        }
+      }
+      // Break the loop if already found
+      if (nsId != null) {
+        break;
+      }
+    }
+
+    // Forward to the proper namenode
+    if (nsId != null) {
+      RemoteMethod method = new RemoteMethod(
+          NamenodeProtocol.class, "getBlocks",
+          new Class<?>[] {DatanodeInfo.class, long.class, long.class},
+          datanode, size, minBlockSize);
+      return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
+    }
+    return null;
+  }
+
+  @Override
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ);
+
+    // We return the information from the default name space
+    String defaultNsId = subclusterResolver.getDefaultNamespace();
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
+    return rpcClient.invokeSingle(defaultNsId, method, 
ExportedBlockKeys.class);
+  }
+
+  @Override
+  public long getTransactionID() throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ);
+
+    // We return the information from the default name space
+    String defaultNsId = subclusterResolver.getDefaultNamespace();
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
+    return rpcClient.invokeSingle(defaultNsId, method, long.class);
+  }
+
+  @Override
+  public long getMostRecentCheckpointTxId() throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ);
+
+    // We return the information from the default name space
+    String defaultNsId = subclusterResolver.getDefaultNamespace();
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, 
"getMostRecentCheckpointTxId");
+    return rpcClient.invokeSingle(defaultNsId, method, long.class);
+  }
+
+  @Override
+  public CheckpointSignature rollEditLog() throws IOException {
+    rpcServer.checkOperation(OperationCategory.WRITE, false);
+    return null;
+  }
+
+  @Override
+  public NamespaceInfo versionRequest() throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ);
+
+    // We return the information from the default name space
+    String defaultNsId = subclusterResolver.getDefaultNamespace();
+    RemoteMethod method =
+        new RemoteMethod(NamenodeProtocol.class, "versionRequest");
+    return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class);
+  }
+
+  @Override
+  public void errorReport(NamenodeRegistration registration, int errorCode,
+      String msg) throws IOException {
+    rpcServer.checkOperation(OperationCategory.UNCHECKED, false);
+  }
+
+  @Override
+  public NamenodeRegistration registerSubordinateNamenode(
+      NamenodeRegistration registration) throws IOException {
+    rpcServer.checkOperation(OperationCategory.WRITE, false);
+    return null;
+  }
+
+  @Override
+  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+      throws IOException {
+    rpcServer.checkOperation(OperationCategory.WRITE, false);
+    return null;
+  }
+
+  @Override
+  public void endCheckpoint(NamenodeRegistration registration,
+      CheckpointSignature sig) throws IOException {
+    rpcServer.checkOperation(OperationCategory.WRITE, false);
+  }
+
+  @Override
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+      throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ, false);
+    return null;
+  }
+
+  @Override
+  public boolean isUpgradeFinalized() throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ, false);
+    return false;
+  }
+
+  @Override
+  public boolean isRollingUpgrade() throws IOException {
+    rpcServer.checkOperation(OperationCategory.READ, false);
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index c973aa6..ecb9f50 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -48,7 +48,6 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
@@ -225,14 +224,14 @@ public class RouterRpcClient {
    *
    * @param ugi User group information.
    * @param nsId Nameservice identifier.
-   * @param rpcAddress ClientProtocol RPC server address of the NN.
+   * @param rpcAddress RPC server address of the NN.
+   * @param proto Protocol of the connection.
    * @return ConnectionContext containing a ClientProtocol proxy client for the
    *         NN + current user.
    * @throws IOException If we cannot get a connection to the NameNode.
    */
-  private ConnectionContext getConnection(
-      UserGroupInformation ugi, String nsId, String rpcAddress)
-          throws IOException {
+  private ConnectionContext getConnection(UserGroupInformation ugi, String 
nsId,
+      String rpcAddress, Class<?> proto) throws IOException {
     ConnectionContext connection = null;
     try {
       // Each proxy holds the UGI info for the current user when it is created.
@@ -242,7 +241,7 @@ public class RouterRpcClient {
       // for each individual request.
 
       // TODO Add tokens from the federated UGI
-      connection = this.connectionManager.getConnection(ugi, rpcAddress);
+      connection = this.connectionManager.getConnection(ugi, rpcAddress, 
proto);
       LOG.debug("User {} NN {} is using connection {}",
           ugi.getUserName(), rpcAddress, connection);
     } catch (Exception ex) {
@@ -326,7 +325,8 @@ public class RouterRpcClient {
   private Object invokeMethod(
       final UserGroupInformation ugi,
       final List<? extends FederationNamenodeContext> namenodes,
-      final Method method, final Object... params) throws IOException {
+      final Class<?> protocol, final Method method, final Object... params)
+          throws IOException {
 
     if (namenodes == null || namenodes.isEmpty()) {
       throw new IOException("No namenodes to invoke " + method.getName() +
@@ -344,9 +344,10 @@ public class RouterRpcClient {
       try {
         String nsId = namenode.getNameserviceId();
         String rpcAddress = namenode.getRpcAddress();
-        connection = this.getConnection(ugi, nsId, rpcAddress);
-        ProxyAndInfo<ClientProtocol> client = connection.getClient();
-        ClientProtocol proxy = client.getProxy();
+        connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
+        ProxyAndInfo<?> client = connection.getClient();
+        final Object proxy = client.getProxy();
+
         ret = invoke(nsId, 0, method, proxy, params);
         if (failover) {
           // Success on alternate server, update
@@ -611,7 +612,29 @@ public class RouterRpcClient {
     List<? extends FederationNamenodeContext> nns =
         getNamenodesForNameservice(nsId);
     RemoteLocationContext loc = new RemoteLocation(nsId, "/");
-    return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc));
+    Class<?> proto = method.getProtocol();
+    Method m = method.getMethod();
+    Object[] params = method.getParams(loc);
+    return invokeMethod(ugi, nns, proto, m, params);
+  }
+
+  /**
+   * Invokes a remote method against the specified namespace.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param nsId Target namespace for the method.
+   * @param method The remote method and parameters to invoke.
+   * @param clazz Class for the return type.
+   * @return The result of invoking the method.
+   * @throws IOException If the invoke generated an error.
+   */
+  public <T> T invokeSingle(final String nsId, RemoteMethod method,
+      Class<T> clazz) throws IOException {
+    @SuppressWarnings("unchecked")
+    T ret = (T)invokeSingle(nsId, method);
+    return ret;
   }
 
   /**
@@ -689,8 +712,9 @@ public class RouterRpcClient {
       List<? extends FederationNamenodeContext> namenodes =
           getNamenodesForNameservice(ns);
       try {
+        Class<?> proto = remoteMethod.getProtocol();
         Object[] params = remoteMethod.getParams(loc);
-        Object result = invokeMethod(ugi, namenodes, m, params);
+        Object result = invokeMethod(ugi, namenodes, proto, m, params);
         // Check if the result is what we expected
         if (isExpectedClass(expectedResultClass, result) &&
             isExpectedValue(expectedResultValue, result)) {
@@ -914,8 +938,9 @@ public class RouterRpcClient {
       String ns = location.getNameserviceId();
       final List<? extends FederationNamenodeContext> namenodes =
           getNamenodesForNameservice(ns);
+      Class<?> proto = method.getProtocol();
       Object[] paramList = method.getParams(location);
-      Object result = invokeMethod(ugi, namenodes, m, paramList);
+      Object result = invokeMethod(ugi, namenodes, proto, m, paramList);
       return Collections.singletonMap(location, clazz.cast(result));
     }
 
@@ -925,6 +950,7 @@ public class RouterRpcClient {
       String nsId = location.getNameserviceId();
       final List<? extends FederationNamenodeContext> namenodes =
           getNamenodesForNameservice(nsId);
+      final Class<?> proto = method.getProtocol();
       final Object[] paramList = method.getParams(location);
       if (standby) {
         // Call the objectGetter to all NNs (including standby)
@@ -939,7 +965,7 @@ public class RouterRpcClient {
           orderedLocations.add(nnLocation);
           callables.add(new Callable<Object>() {
             public Object call() throws Exception {
-              return invokeMethod(ugi, nnList, m, paramList);
+              return invokeMethod(ugi, nnList, proto, m, paramList);
             }
           });
         }
@@ -948,7 +974,7 @@ public class RouterRpcClient {
         orderedLocations.add(location);
         callables.add(new Callable<Object>() {
           public Object call() throws Exception {
-            return invokeMethod(ugi, namenodes, m, paramList);
+            return invokeMethod(ugi, namenodes, proto, m, paramList);
           }
         });
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 383fd77..1159289 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -101,9 +101,13 @@ import 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
+import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import 
org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@@ -113,11 +117,18 @@ import 
org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -145,7 +156,8 @@ import com.google.protobuf.BlockingService;
  * the requests to the active
  * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}.
  */
-public class RouterRpcServer extends AbstractService implements ClientProtocol 
{
+public class RouterRpcServer extends AbstractService
+    implements ClientProtocol, NamenodeProtocol {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RouterRpcServer.class);
@@ -191,6 +203,8 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
   private final Quota quotaCall;
   /** Erasure coding calls. */
   private final ErasureCoding erasureCoding;
+  /** NamenodeProtocol calls. */
+  private final RouterNamenodeProtocol nnProto;
 
 
   /**
@@ -243,6 +257,11 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
     BlockingService clientNNPbService = ClientNamenodeProtocol
         .newReflectiveBlockingService(clientProtocolServerTranslator);
 
+    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
+        new NamenodeProtocolServerSideTranslatorPB(this);
+    BlockingService nnPbService = NamenodeProtocolService
+        .newReflectiveBlockingService(namenodeProtocolXlator);
+
     InetSocketAddress confRpcAddress = conf.getSocketAddr(
         RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
         RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
@@ -261,6 +280,11 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
         .setQueueSizePerHandler(handlerQueueSize)
         .setVerbose(false)
         .build();
+
+    // Add all the RPC protocols that the Router implements
+    DFSUtil.addPBProtocol(
+        conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
+
     // We don't want the server to log the full stack trace for some exceptions
     this.rpcServer.addTerseExceptions(
         RemoteException.class,
@@ -292,6 +316,7 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
     // Initialize modules
     this.quotaCall = new Quota(this.router, this);
     this.erasureCoding = new ErasureCoding(this);
+    this.nnProto = new RouterNamenodeProtocol(this);
   }
 
   @Override
@@ -337,6 +362,15 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
   }
 
   /**
+   * Get the subcluster resolver.
+   *
+   * @return Subcluster resolver.
+   */
+  public FileSubclusterResolver getSubclusterResolver() {
+    return subclusterResolver;
+  }
+
+  /**
    * Get the RPC monitor and metrics.
    *
    * @return RPC monitor and metrics.
@@ -1349,7 +1383,7 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
         action, isChecked);
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
     Map<FederationNamespaceInfo, Boolean> results =
-        rpcClient.invokeConcurrent(nss, method, true, true, boolean.class);
+        rpcClient.invokeConcurrent(nss, method, true, true, Boolean.class);
 
     // We only report true if all the name space are in safe mode
     int numSafemode = 0;
@@ -1369,7 +1403,7 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
         new Class<?>[] {String.class}, arg);
     final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
     Map<FederationNamespaceInfo, Boolean> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
+        rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
 
     boolean success = true;
     for (boolean s : ret.values()) {
@@ -2070,6 +2104,77 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol {
     return null;
   }
 
+  @Override // NamenodeProtocol
+  public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
+      long minBlockSize) throws IOException {
+    return nnProto.getBlocks(datanode, size, minBlockSize);
+  }
+
+  @Override // NamenodeProtocol
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    return nnProto.getBlockKeys();
+  }
+
+  @Override // NamenodeProtocol
+  public long getTransactionID() throws IOException {
+    return nnProto.getTransactionID();
+  }
+
+  @Override // NamenodeProtocol
+  public long getMostRecentCheckpointTxId() throws IOException {
+    return nnProto.getMostRecentCheckpointTxId();
+  }
+
+  @Override // NamenodeProtocol
+  public CheckpointSignature rollEditLog() throws IOException {
+    return nnProto.rollEditLog();
+  }
+
+  @Override // NamenodeProtocol
+  public NamespaceInfo versionRequest() throws IOException {
+    return nnProto.versionRequest();
+  }
+
+  @Override // NamenodeProtocol
+  public void errorReport(NamenodeRegistration registration, int errorCode,
+      String msg) throws IOException {
+    nnProto.errorReport(registration, errorCode, msg);
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeRegistration registerSubordinateNamenode(
+      NamenodeRegistration registration) throws IOException {
+    return nnProto.registerSubordinateNamenode(registration);
+  }
+
+  @Override // NamenodeProtocol
+  public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+      throws IOException {
+    return nnProto.startCheckpoint(registration);
+  }
+
+  @Override // NamenodeProtocol
+  public void endCheckpoint(NamenodeRegistration registration,
+      CheckpointSignature sig) throws IOException {
+    nnProto.endCheckpoint(registration, sig);
+  }
+
+  @Override // NamenodeProtocol
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+      throws IOException {
+    return nnProto.getEditLogManifest(sinceTxId);
+  }
+
+  @Override // NamenodeProtocol
+  public boolean isUpgradeFinalized() throws IOException {
+    return nnProto.isUpgradeFinalized();
+  }
+
+  @Override // NamenodeProtocol
+  public boolean isRollingUpgrade() throws IOException {
+    return nnProto.isRollingUpgrade();
+  }
+
   /**
    * Locate the location with the matching block pool id.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
index c49f90a..0ad8536 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
@@ -239,6 +239,10 @@ public class MiniRouterDFSCluster {
       }
       return client;
     }
+
+    public Configuration getConf() {
+      return conf;
+    }
   }
 
   /**
@@ -351,6 +355,10 @@ public class MiniRouterDFSCluster {
       }
       return suffix;
     }
+
+    public Configuration getConf() {
+      return conf;
+    }
   }
 
   public MiniRouterDFSCluster(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index 2e4b80d..a731648 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
@@ -68,14 +70,18 @@ public class TestConnectionManager {
     Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
 
     ConnectionPool pool1 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
     addConnectionsToPool(pool1, 9, 4);
-    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1);
+    poolMap.put(
+        new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, 
ClientProtocol.class),
+        pool1);
 
     ConnectionPool pool2 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10);
+        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class);
     addConnectionsToPool(pool2, 10, 10);
-    poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2);
+    poolMap.put(
+        new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, 
ClientProtocol.class),
+        pool2);
 
     checkPoolConnections(TEST_USER1, 9, 4);
     checkPoolConnections(TEST_USER2, 10, 10);
@@ -94,9 +100,11 @@ public class TestConnectionManager {
 
     // Make sure the number of connections doesn't go below minSize
     ConnectionPool pool3 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10);
+        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class);
     addConnectionsToPool(pool3, 8, 0);
-    poolMap.put(new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS), pool3);
+    poolMap.put(
+        new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, 
ClientProtocol.class),
+        pool3);
     checkPoolConnections(TEST_USER3, 10, 0);
     for (int i = 0; i < 10; i++) {
       connManager.cleanup(pool3);
@@ -119,9 +127,41 @@ public class TestConnectionManager {
     int activeConns = 5;
 
     ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
     addConnectionsToPool(pool, totalConns, activeConns);
-    poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool);
+    poolMap.put(
+        new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, 
ClientProtocol.class),
+        pool);
+
+    // All remaining connections should be usable
+    final int remainingSlots = totalConns - activeConns;
+    for (int i = 0; i < remainingSlots; i++) {
+      ConnectionContext cc = pool.getConnection();
+      assertTrue(cc.isUsable());
+      cc.getClient();
+      activeConns++;
+    }
+
+    checkPoolConnections(TEST_USER1, totalConns, activeConns);
+
+    // Ask for more and this returns an active connection
+    ConnectionContext cc = pool.getConnection();
+    assertTrue(cc.isActive());
+  }
+
+  @Test
+  public void getGetConnectionNamenodeProtocol() throws Exception {
+    Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
+    final int totalConns = 10;
+    int activeConns = 5;
+
+    ConnectionPool pool = new ConnectionPool(
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class);
+    addConnectionsToPool(pool, totalConns, activeConns);
+    poolMap.put(
+        new ConnectionPoolId(
+            TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class),
+        pool);
 
     // All remaining connections should be usable
     final int remainingSlots = totalConns - activeConns;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2be64eb2/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 5014880..cc74098 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -70,16 +71,22 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
@@ -133,6 +140,11 @@ public class TestRouterRpc {
   /** Client interface to the Namenode. */
   private ClientProtocol nnProtocol;
 
+  /** NameNodeProtocol interface to the Router. */
+  private NamenodeProtocol routerNamenodeProtocol;
+  /** NameNodeProtocol interface to the Namenode. */
+  private NamenodeProtocol nnNamenodeProtocol;
+
   /** Filesystem interface to the Router. */
   private FileSystem routerFS;
   /** Filesystem interface to the Namenode. */
@@ -189,22 +201,18 @@ public class TestRouterRpc {
     // Wait to ensure NN has fully created its test directories
     Thread.sleep(100);
 
-    // Default namenode and random router for this test
-    this.router = cluster.getRandomRouter();
-    this.ns = cluster.getNameservices().get(0);
-    this.namenode = cluster.getNamenode(ns, null);
-
-    // Handles to the ClientProtocol interface
-    this.routerProtocol = router.getClient().getNamenode();
-    this.nnProtocol = namenode.getClient().getNamenode();
+    // Random router for this test
+    RouterContext rndRouter = cluster.getRandomRouter();
+    this.setRouter(rndRouter);
 
-    // Handles to the filesystem client
-    this.nnFS = namenode.getFileSystem();
-    this.routerFS = router.getFileSystem();
+    // Pick a namenode for this test
+    String ns0 = cluster.getNameservices().get(0);
+    this.setNs(ns0);
+    this.setNamenode(cluster.getNamenode(ns0, null));
 
     // Create a test file on the NN
-    Random r = new Random();
-    String randomFile = "testfile-" + r.nextInt();
+    Random rnd = new Random();
+    String randomFile = "testfile-" + rnd.nextInt();
     this.nnFile =
         cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
     this.routerFile =
@@ -245,6 +253,8 @@ public class TestRouterRpc {
     this.router = r;
     this.routerProtocol = r.getClient().getNamenode();
     this.routerFS = r.getFileSystem();
+    this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(),
+        router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
   }
 
   protected FileSystem getRouterFileSystem() {
@@ -288,6 +298,12 @@ public class TestRouterRpc {
     this.namenode = nn;
     this.nnProtocol = nn.getClient().getNamenode();
     this.nnFS = nn.getFileSystem();
+
+    // Namenode from the default namespace
+    String ns0 = cluster.getNameservices().get(0);
+    NamenodeContext nn0 = cluster.getNamenode(ns0, null);
+    this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(),
+        nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
   }
 
   protected String getNs() {
@@ -933,6 +949,79 @@ public class TestRouterRpc {
   }
 
   @Test
+  public void testProxyVersionRequest() throws Exception {
+    NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
+    NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
+    assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID());
+    assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID());
+    assertEquals(nnVersion.getClusterID(), rVersion.getClusterID());
+    assertEquals(nnVersion.getLayoutVersion(), rVersion.getLayoutVersion());
+    assertEquals(nnVersion.getCTime(), rVersion.getCTime());
+  }
+
+  @Test
+  public void testProxyGetBlockKeys() throws Exception {
+    ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
+    ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
+    assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey());
+    assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval());
+    assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime());
+  }
+
+  @Test
+  public void testProxyGetBlocks() throws Exception {
+    // Get datanodes
+    DatanodeInfo[] dns =
+        routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+    DatanodeInfo dn0 = dns[0];
+
+    // Verify that checking that datanode works
+    BlocksWithLocations routerBlockLocations =
+        routerNamenodeProtocol.getBlocks(dn0, 1024, 0);
+    BlocksWithLocations nnBlockLocations =
+        nnNamenodeProtocol.getBlocks(dn0, 1024, 0);
+    BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
+    BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
+    assertEquals(nnBlocks.length, routerBlocks.length);
+    for (int i = 0; i < routerBlocks.length; i++) {
+      assertEquals(
+          nnBlocks[i].getBlock().getBlockId(),
+          routerBlocks[i].getBlock().getBlockId());
+    }
+  }
+
+  @Test
+  public void testProxyGetTransactionID() throws IOException {
+    long routerTransactionID = routerNamenodeProtocol.getTransactionID();
+    long nnTransactionID = nnNamenodeProtocol.getTransactionID();
+    assertEquals(nnTransactionID, routerTransactionID);
+  }
+
+  @Test
+  public void testProxyGetMostRecentCheckpointTxId() throws IOException {
+    long routerCheckPointId =
+        routerNamenodeProtocol.getMostRecentCheckpointTxId();
+    long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
+    assertEquals(nnCheckPointId, routerCheckPointId);
+  }
+
+  @Test
+  public void testProxySetSafemode() throws Exception {
+    boolean routerSafemode =
+        routerProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
+    boolean nnSafemode =
+        nnProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
+    assertEquals(nnSafemode, routerSafemode);
+  }
+
+  @Test
+  public void testProxyRestoreFailedStorage() throws Exception {
+    boolean routerSuccess = routerProtocol.restoreFailedStorage("check");
+    boolean nnSuccess = nnProtocol.restoreFailedStorage("check");
+    assertEquals(nnSuccess, routerSuccess);
+  }
+
+  @Test
   public void testErasureCoding() throws IOException {
 
     LOG.info("List the available erasurce coding policies");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to