[42/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.
HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8e03592 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8e03592 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8e03592 Branch: refs/heads/HDFS-10467 Commit: b8e0359289104602bda8991c61b4b98cc9d3a8b7 Parents: fe3672c Author: Inigo GoiriAuthored: Thu May 11 09:57:03 2017 -0700 Committer: Inigo Goiri Committed: Sat Aug 12 09:36:24 2017 -0700 -- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 38 + .../resolver/FederationNamespaceInfo.java | 46 +- .../federation/resolver/RemoteLocation.java | 46 +- .../federation/router/ConnectionContext.java| 104 + .../federation/router/ConnectionManager.java| 408 .../federation/router/ConnectionPool.java | 314 +++ .../federation/router/ConnectionPoolId.java | 117 ++ .../router/RemoteLocationContext.java | 38 +- .../server/federation/router/RemoteMethod.java | 164 ++ .../server/federation/router/RemoteParam.java | 71 + .../hdfs/server/federation/router/Router.java | 58 +- .../federation/router/RouterRpcClient.java | 856 .../federation/router/RouterRpcServer.java | 1867 +- .../src/main/resources/hdfs-default.xml | 95 + .../server/federation/FederationTestUtils.java | 80 +- .../hdfs/server/federation/MockResolver.java| 90 +- .../server/federation/RouterConfigBuilder.java | 20 +- .../server/federation/RouterDFSCluster.java | 535 +++-- .../server/federation/router/TestRouter.java| 31 +- .../server/federation/router/TestRouterRpc.java | 869 .../router/TestRouterRpcMultiDestination.java | 216 ++ 21 files changed, 5675 insertions(+), 388 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8e03592/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2b6d0e8..ca24fd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1102,6 +1102,44 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // HDFS Router-based federation public static final String FEDERATION_ROUTER_PREFIX = "dfs.federation.router."; + public static final String DFS_ROUTER_DEFAULT_NAMESERVICE = + FEDERATION_ROUTER_PREFIX + "default.nameserviceId"; + public static final String DFS_ROUTER_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "handler.count"; + public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY = + FEDERATION_ROUTER_PREFIX + "reader.queue.size"; + public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100; + public static final String DFS_ROUTER_READER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "reader.count"; + public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1; + public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY = + FEDERATION_ROUTER_PREFIX + "handler.queue.size"; + public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100; + public static final String DFS_ROUTER_RPC_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "rpc-bind-host"; + public static final int DFS_ROUTER_RPC_PORT_DEFAULT = ; + public static final String DFS_ROUTER_RPC_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "rpc-address"; + public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT; + public static final String DFS_ROUTER_RPC_ENABLE = + FEDERATION_ROUTER_PREFIX + "rpc.enable"; + public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + + // HDFS Router NN client + public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = + FEDERATION_ROUTER_PREFIX + "connection.pool-size"; + public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = + 64; + public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN = + FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms"; + public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS = + FEDERATION_ROUTER_PREFIX + "connection.clean.ms"; + public
[42/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a60031c7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index ee6f57d..2875750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time; /** * In-memory cache/mock of a namenode and file resolver. Stores the most - * recently updated NN information for each nameservice and block pool. Also + * recently updated NN information for each nameservice and block pool. It also * stores a virtual mount table for resolving global namespace paths to local NN * paths. */ @@ -51,82 +51,93 @@ public class MockResolver implements ActiveNamenodeResolver, FileSubclusterResolver { private Mapresolver = - new HashMap (); - private Map locations = - new HashMap (); - private Set namespaces = - new HashSet(); + new HashMap<>(); + private Map locations = new HashMap<>(); + private Set namespaces = new HashSet<>(); private String defaultNamespace = null; + public MockResolver(Configuration conf, StateStoreService store) { this.cleanRegistrations(); } - public void addLocation(String mount, String nameservice, String location) { -RemoteLocation remoteLocation = new RemoteLocation(nameservice, location); -List locationsList = locations.get(mount); + public void addLocation(String mount, String nsId, String location) { +List locationsList = this.locations.get(mount); if (locationsList == null) { - locationsList = new LinkedList(); - locations.put(mount, locationsList); + locationsList = new LinkedList<>(); + this.locations.put(mount, locationsList); } + +final RemoteLocation remoteLocation = new RemoteLocation(nsId, location); if (!locationsList.contains(remoteLocation)) { locationsList.add(remoteLocation); } if (this.defaultNamespace == null) { - this.defaultNamespace = nameservice; + this.defaultNamespace = nsId; } } public synchronized void cleanRegistrations() { -this.resolver = -new HashMap (); -this.namespaces = new HashSet(); +this.resolver = new HashMap<>(); +this.namespaces = new HashSet<>(); } @Override public void updateActiveNamenode( - String ns, InetSocketAddress successfulAddress) { + String nsId, InetSocketAddress successfulAddress) { String address = successfulAddress.getHostName() + ":" + successfulAddress.getPort(); -String key = ns; +String key = nsId; if (key != null) { // Update the active entry @SuppressWarnings("unchecked") - List iterator = - (List) resolver.get(key); - for (FederationNamenodeContext namenode : iterator) { + List namenodes = + (List) this.resolver.get(key); + for (FederationNamenodeContext namenode : namenodes) { if (namenode.getRpcAddress().equals(address)) { MockNamenodeContext nn = (MockNamenodeContext) namenode; nn.setState(FederationNamenodeServiceState.ACTIVE); break; } } - Collections.sort(iterator, new NamenodePriorityComparator()); + // This operation modifies the list so we need to be careful + synchronized(namenodes) { +Collections.sort(namenodes, new NamenodePriorityComparator()); + } } } @Override public List getNamenodesForNameserviceId(String nameserviceId) { -return resolver.get(nameserviceId); +// Return a copy of the list because it is updated periodically +List namenodes = +this.resolver.get(nameserviceId); +return Collections.unmodifiableList(new ArrayList<>(namenodes)); } @Override public List getNamenodesForBlockPoolId( String blockPoolId) { -return resolver.get(blockPoolId); +// Return a copy of the list because it is updated periodically +List namenodes = +this.resolver.get(blockPoolId); +return Collections.unmodifiableList(new ArrayList<>(namenodes)); } private static class MockNamenodeContext implements FederationNamenodeContext { + +private String namenodeId; +private String nameserviceId; + private String webAddress; private String rpcAddress; private String serviceAddress; private String lifelineAddress; -private String namenodeId; -private String
[42/50] [abbrv] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b85bcc8c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index ee6f57d..2875750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time; /** * In-memory cache/mock of a namenode and file resolver. Stores the most - * recently updated NN information for each nameservice and block pool. Also + * recently updated NN information for each nameservice and block pool. It also * stores a virtual mount table for resolving global namespace paths to local NN * paths. */ @@ -51,82 +51,93 @@ public class MockResolver implements ActiveNamenodeResolver, FileSubclusterResolver { private Mapresolver = - new HashMap (); - private Map locations = - new HashMap (); - private Set namespaces = - new HashSet(); + new HashMap<>(); + private Map locations = new HashMap<>(); + private Set namespaces = new HashSet<>(); private String defaultNamespace = null; + public MockResolver(Configuration conf, StateStoreService store) { this.cleanRegistrations(); } - public void addLocation(String mount, String nameservice, String location) { -RemoteLocation remoteLocation = new RemoteLocation(nameservice, location); -List locationsList = locations.get(mount); + public void addLocation(String mount, String nsId, String location) { +List locationsList = this.locations.get(mount); if (locationsList == null) { - locationsList = new LinkedList(); - locations.put(mount, locationsList); + locationsList = new LinkedList<>(); + this.locations.put(mount, locationsList); } + +final RemoteLocation remoteLocation = new RemoteLocation(nsId, location); if (!locationsList.contains(remoteLocation)) { locationsList.add(remoteLocation); } if (this.defaultNamespace == null) { - this.defaultNamespace = nameservice; + this.defaultNamespace = nsId; } } public synchronized void cleanRegistrations() { -this.resolver = -new HashMap (); -this.namespaces = new HashSet(); +this.resolver = new HashMap<>(); +this.namespaces = new HashSet<>(); } @Override public void updateActiveNamenode( - String ns, InetSocketAddress successfulAddress) { + String nsId, InetSocketAddress successfulAddress) { String address = successfulAddress.getHostName() + ":" + successfulAddress.getPort(); -String key = ns; +String key = nsId; if (key != null) { // Update the active entry @SuppressWarnings("unchecked") - List iterator = - (List) resolver.get(key); - for (FederationNamenodeContext namenode : iterator) { + List namenodes = + (List) this.resolver.get(key); + for (FederationNamenodeContext namenode : namenodes) { if (namenode.getRpcAddress().equals(address)) { MockNamenodeContext nn = (MockNamenodeContext) namenode; nn.setState(FederationNamenodeServiceState.ACTIVE); break; } } - Collections.sort(iterator, new NamenodePriorityComparator()); + // This operation modifies the list so we need to be careful + synchronized(namenodes) { +Collections.sort(namenodes, new NamenodePriorityComparator()); + } } } @Override public List getNamenodesForNameserviceId(String nameserviceId) { -return resolver.get(nameserviceId); +// Return a copy of the list because it is updated periodically +List namenodes = +this.resolver.get(nameserviceId); +return Collections.unmodifiableList(new ArrayList<>(namenodes)); } @Override public List getNamenodesForBlockPoolId( String blockPoolId) { -return resolver.get(blockPoolId); +// Return a copy of the list because it is updated periodically +List namenodes = +this.resolver.get(blockPoolId); +return Collections.unmodifiableList(new ArrayList<>(namenodes)); } private static class MockNamenodeContext implements FederationNamenodeContext { + +private String namenodeId; +private String nameserviceId; + private String webAddress; private String rpcAddress; private String serviceAddress; private String lifelineAddress; -private String namenodeId; -private String