[42/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.

2017-08-12 Thread inigoiri
HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo 
Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8e03592
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8e03592
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8e03592

Branch: refs/heads/HDFS-10467
Commit: b8e0359289104602bda8991c61b4b98cc9d3a8b7
Parents: fe3672c
Author: Inigo Goiri 
Authored: Thu May 11 09:57:03 2017 -0700
Committer: Inigo Goiri 
Committed: Sat Aug 12 09:36:24 2017 -0700

--
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   38 +
 .../resolver/FederationNamespaceInfo.java   |   46 +-
 .../federation/resolver/RemoteLocation.java |   46 +-
 .../federation/router/ConnectionContext.java|  104 +
 .../federation/router/ConnectionManager.java|  408 
 .../federation/router/ConnectionPool.java   |  314 +++
 .../federation/router/ConnectionPoolId.java |  117 ++
 .../router/RemoteLocationContext.java   |   38 +-
 .../server/federation/router/RemoteMethod.java  |  164 ++
 .../server/federation/router/RemoteParam.java   |   71 +
 .../hdfs/server/federation/router/Router.java   |   58 +-
 .../federation/router/RouterRpcClient.java  |  856 
 .../federation/router/RouterRpcServer.java  | 1867 +-
 .../src/main/resources/hdfs-default.xml |   95 +
 .../server/federation/FederationTestUtils.java  |   80 +-
 .../hdfs/server/federation/MockResolver.java|   90 +-
 .../server/federation/RouterConfigBuilder.java  |   20 +-
 .../server/federation/RouterDFSCluster.java |  535 +++--
 .../server/federation/router/TestRouter.java|   31 +-
 .../server/federation/router/TestRouterRpc.java |  869 
 .../router/TestRouterRpcMultiDestination.java   |  216 ++
 21 files changed, 5675 insertions(+), 388 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8e03592/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 2b6d0e8..ca24fd5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1102,6 +1102,44 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   // HDFS Router-based federation
   public static final String FEDERATION_ROUTER_PREFIX =
   "dfs.federation.router.";
+  public static final String DFS_ROUTER_DEFAULT_NAMESERVICE =
+  FEDERATION_ROUTER_PREFIX + "default.nameserviceId";
+  public static final String DFS_ROUTER_HANDLER_COUNT_KEY =
+  FEDERATION_ROUTER_PREFIX + "handler.count";
+  public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10;
+  public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY =
+  FEDERATION_ROUTER_PREFIX + "reader.queue.size";
+  public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100;
+  public static final String DFS_ROUTER_READER_COUNT_KEY =
+  FEDERATION_ROUTER_PREFIX + "reader.count";
+  public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1;
+  public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY =
+  FEDERATION_ROUTER_PREFIX + "handler.queue.size";
+  public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
+  public static final String DFS_ROUTER_RPC_BIND_HOST_KEY =
+  FEDERATION_ROUTER_PREFIX + "rpc-bind-host";
+  public static final int DFS_ROUTER_RPC_PORT_DEFAULT = ;
+  public static final String DFS_ROUTER_RPC_ADDRESS_KEY =
+  FEDERATION_ROUTER_PREFIX + "rpc-address";
+  public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT =
+  "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT;
+  public static final String DFS_ROUTER_RPC_ENABLE =
+  FEDERATION_ROUTER_PREFIX + "rpc.enable";
+  public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+
+  // HDFS Router NN client
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
+  FEDERATION_ROUTER_PREFIX + "connection.pool-size";
+  public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
+  64;
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN =
+  FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms";
+  public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT =
+  TimeUnit.MINUTES.toMillis(1);
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS =
+  FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
+  public 

[42/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.

2017-07-28 Thread inigoiri
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a60031c7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index ee6f57d..2875750 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time;
 
 /**
  * In-memory cache/mock of a namenode and file resolver. Stores the most
- * recently updated NN information for each nameservice and block pool. Also
+ * recently updated NN information for each nameservice and block pool. It also
  * stores a virtual mount table for resolving global namespace paths to local 
NN
  * paths.
  */
@@ -51,82 +51,93 @@ public class MockResolver
 implements ActiveNamenodeResolver, FileSubclusterResolver {
 
   private Map resolver =
-  new HashMap();
-  private Map locations =
-  new HashMap();
-  private Set namespaces =
-  new HashSet();
+  new HashMap<>();
+  private Map locations = new HashMap<>();
+  private Set namespaces = new HashSet<>();
   private String defaultNamespace = null;
 
+
   public MockResolver(Configuration conf, StateStoreService store) {
 this.cleanRegistrations();
   }
 
-  public void addLocation(String mount, String nameservice, String location) {
-RemoteLocation remoteLocation = new RemoteLocation(nameservice, location);
-List locationsList = locations.get(mount);
+  public void addLocation(String mount, String nsId, String location) {
+List locationsList = this.locations.get(mount);
 if (locationsList == null) {
-  locationsList = new LinkedList();
-  locations.put(mount, locationsList);
+  locationsList = new LinkedList<>();
+  this.locations.put(mount, locationsList);
 }
+
+final RemoteLocation remoteLocation = new RemoteLocation(nsId, location);
 if (!locationsList.contains(remoteLocation)) {
   locationsList.add(remoteLocation);
 }
 
 if (this.defaultNamespace == null) {
-  this.defaultNamespace = nameservice;
+  this.defaultNamespace = nsId;
 }
   }
 
   public synchronized void cleanRegistrations() {
-this.resolver =
-new HashMap();
-this.namespaces = new HashSet();
+this.resolver = new HashMap<>();
+this.namespaces = new HashSet<>();
   }
 
   @Override
   public void updateActiveNamenode(
-  String ns, InetSocketAddress successfulAddress) {
+  String nsId, InetSocketAddress successfulAddress) {
 
 String address = successfulAddress.getHostName() + ":" +
 successfulAddress.getPort();
-String key = ns;
+String key = nsId;
 if (key != null) {
   // Update the active entry
   @SuppressWarnings("unchecked")
-  List iterator =
-  (List) resolver.get(key);
-  for (FederationNamenodeContext namenode : iterator) {
+  List namenodes =
+  (List) this.resolver.get(key);
+  for (FederationNamenodeContext namenode : namenodes) {
 if (namenode.getRpcAddress().equals(address)) {
   MockNamenodeContext nn = (MockNamenodeContext) namenode;
   nn.setState(FederationNamenodeServiceState.ACTIVE);
   break;
 }
   }
-  Collections.sort(iterator, new NamenodePriorityComparator());
+  // This operation modifies the list so we need to be careful
+  synchronized(namenodes) {
+Collections.sort(namenodes, new NamenodePriorityComparator());
+  }
 }
   }
 
   @Override
   public List
   getNamenodesForNameserviceId(String nameserviceId) {
-return resolver.get(nameserviceId);
+// Return a copy of the list because it is updated periodically
+List namenodes =
+this.resolver.get(nameserviceId);
+return Collections.unmodifiableList(new ArrayList<>(namenodes));
   }
 
   @Override
   public List getNamenodesForBlockPoolId(
   String blockPoolId) {
-return resolver.get(blockPoolId);
+// Return a copy of the list because it is updated periodically
+List namenodes =
+this.resolver.get(blockPoolId);
+return Collections.unmodifiableList(new ArrayList<>(namenodes));
   }
 
   private static class MockNamenodeContext
   implements FederationNamenodeContext {
+
+private String namenodeId;
+private String nameserviceId;
+
 private String webAddress;
 private String rpcAddress;
 private String serviceAddress;
 private String lifelineAddress;
-private String namenodeId;
-private String 

[42/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.

2017-06-21 Thread inigoiri
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85bcc8c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index ee6f57d..2875750 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time;
 
 /**
  * In-memory cache/mock of a namenode and file resolver. Stores the most
- * recently updated NN information for each nameservice and block pool. Also
+ * recently updated NN information for each nameservice and block pool. It also
  * stores a virtual mount table for resolving global namespace paths to local 
NN
  * paths.
  */
@@ -51,82 +51,93 @@ public class MockResolver
 implements ActiveNamenodeResolver, FileSubclusterResolver {
 
   private Map resolver =
-  new HashMap();
-  private Map locations =
-  new HashMap();
-  private Set namespaces =
-  new HashSet();
+  new HashMap<>();
+  private Map locations = new HashMap<>();
+  private Set namespaces = new HashSet<>();
   private String defaultNamespace = null;
 
+
   public MockResolver(Configuration conf, StateStoreService store) {
 this.cleanRegistrations();
   }
 
-  public void addLocation(String mount, String nameservice, String location) {
-RemoteLocation remoteLocation = new RemoteLocation(nameservice, location);
-List locationsList = locations.get(mount);
+  public void addLocation(String mount, String nsId, String location) {
+List locationsList = this.locations.get(mount);
 if (locationsList == null) {
-  locationsList = new LinkedList();
-  locations.put(mount, locationsList);
+  locationsList = new LinkedList<>();
+  this.locations.put(mount, locationsList);
 }
+
+final RemoteLocation remoteLocation = new RemoteLocation(nsId, location);
 if (!locationsList.contains(remoteLocation)) {
   locationsList.add(remoteLocation);
 }
 
 if (this.defaultNamespace == null) {
-  this.defaultNamespace = nameservice;
+  this.defaultNamespace = nsId;
 }
   }
 
   public synchronized void cleanRegistrations() {
-this.resolver =
-new HashMap();
-this.namespaces = new HashSet();
+this.resolver = new HashMap<>();
+this.namespaces = new HashSet<>();
   }
 
   @Override
   public void updateActiveNamenode(
-  String ns, InetSocketAddress successfulAddress) {
+  String nsId, InetSocketAddress successfulAddress) {
 
 String address = successfulAddress.getHostName() + ":" +
 successfulAddress.getPort();
-String key = ns;
+String key = nsId;
 if (key != null) {
   // Update the active entry
   @SuppressWarnings("unchecked")
-  List iterator =
-  (List) resolver.get(key);
-  for (FederationNamenodeContext namenode : iterator) {
+  List namenodes =
+  (List) this.resolver.get(key);
+  for (FederationNamenodeContext namenode : namenodes) {
 if (namenode.getRpcAddress().equals(address)) {
   MockNamenodeContext nn = (MockNamenodeContext) namenode;
   nn.setState(FederationNamenodeServiceState.ACTIVE);
   break;
 }
   }
-  Collections.sort(iterator, new NamenodePriorityComparator());
+  // This operation modifies the list so we need to be careful
+  synchronized(namenodes) {
+Collections.sort(namenodes, new NamenodePriorityComparator());
+  }
 }
   }
 
   @Override
   public List
   getNamenodesForNameserviceId(String nameserviceId) {
-return resolver.get(nameserviceId);
+// Return a copy of the list because it is updated periodically
+List namenodes =
+this.resolver.get(nameserviceId);
+return Collections.unmodifiableList(new ArrayList<>(namenodes));
   }
 
   @Override
   public List getNamenodesForBlockPoolId(
   String blockPoolId) {
-return resolver.get(blockPoolId);
+// Return a copy of the list because it is updated periodically
+List namenodes =
+this.resolver.get(blockPoolId);
+return Collections.unmodifiableList(new ArrayList<>(namenodes));
   }
 
   private static class MockNamenodeContext
   implements FederationNamenodeContext {
+
+private String namenodeId;
+private String nameserviceId;
+
 private String webAddress;
 private String rpcAddress;
 private String serviceAddress;
 private String lifelineAddress;
-private String namenodeId;
-private String