Repository: incubator-crail Updated Branches: refs/heads/master 7ea8753a6 -> d58abd828
Narpc: enable multiple core dispatcher Enable multi core dispatcher if configured to use NaRPC (RPC and Storage). Fixes CRAIL-9 JIRA ticket Close #5 Signed-off-by: Jonas Pfefferle <peppe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/d58abd82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/d58abd82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/d58abd82 Branch: refs/heads/master Commit: d58abd828245815f450d2342ff5df6505ea704e6 Parents: 7ea8753 Author: Patrick Stuedi <s...@zurich.ibm.com> Authored: Fri Feb 23 13:59:02 2018 +0100 Committer: Jonas Pfefferle <peppe...@apache.org> Committed: Fri Feb 23 14:51:18 2018 +0100 ---------------------------------------------------------------------- pom.xml | 2 +- .../crail/namenode/rpc/tcp/TcpNameNodeServer.java | 2 +- .../crail/namenode/rpc/tcp/TcpRpcConstants.java | 7 +++++++ .../crail/storage/tcp/TcpStorageConstants.java | 15 +++++++++++++++ .../apache/crail/storage/tcp/TcpStorageServer.java | 2 +- 5 files changed, 25 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index eb64c90..720f078 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ <dependency> <groupId>com.ibm.narpc</groupId> <artifactId>narpc</artifactId> - <version>1.0</version> + <version>1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java ---------------------------------------------------------------------- diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java index 7c2f78d..60b1833 100644 --- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java +++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeServer.java @@ -49,7 +49,7 @@ public class TcpNameNodeServer extends RpcServer { TcpRpcConstants.verify(); this.serverGroup = new NaRPCServerGroup<TcpNameNodeRequest, TcpNameNodeResponse>( dispatcher, TcpRpcConstants.NAMENODE_TCP_QUEUEDEPTH, - TcpRpcConstants.NAMENODE_TCP_MESSAGESIZE, true); + TcpRpcConstants.NAMENODE_TCP_MESSAGESIZE, true, TcpRpcConstants.NAMENODE_TCP_CORES); this.serverEndpoint = serverGroup.createServerEndpoint(); InetSocketAddress inetSocketAddress = CrailUtils.getNameNodeAddress(); serverEndpoint.bind(inetSocketAddress); http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java ---------------------------------------------------------------------- diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java index 407d366..05a0b10 100644 --- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java +++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConstants.java @@ -34,6 +34,9 @@ public class TcpRpcConstants { public static final String NAMENODE_TCP_MESSAGESIZE_KEY = "crail.namenode.tcp.messageSize"; public static int NAMENODE_TCP_MESSAGESIZE = 512; + public static final String NAMENODE_TCP_CORES_KEY = "crail.namenode.tcp.cores"; + public static int NAMENODE_TCP_CORES = 1; + public static void updateConstants(CrailConfiguration conf){ if (conf.get(NAMENODE_TCP_QUEUEDEPTH_KEY) != null) { NAMENODE_TCP_QUEUEDEPTH = Integer.parseInt(conf.get(NAMENODE_TCP_QUEUEDEPTH_KEY)); @@ -41,6 +44,9 @@ public class TcpRpcConstants { if (conf.get(NAMENODE_TCP_MESSAGESIZE_KEY) != null) { NAMENODE_TCP_MESSAGESIZE = Integer.parseInt(conf.get(NAMENODE_TCP_MESSAGESIZE_KEY)); } + if (conf.get(NAMENODE_TCP_CORES_KEY) != null) { + NAMENODE_TCP_CORES = Integer.parseInt(conf.get(NAMENODE_TCP_CORES_KEY)); + } } public static void verify() throws IOException { @@ -49,5 +55,6 @@ public class TcpRpcConstants { public static void printConf(Logger logger) { LOG.info(NAMENODE_TCP_QUEUEDEPTH_KEY + " " + NAMENODE_TCP_QUEUEDEPTH); LOG.info(NAMENODE_TCP_MESSAGESIZE_KEY + " " + NAMENODE_TCP_MESSAGESIZE); + LOG.info(NAMENODE_TCP_CORES_KEY + " " + NAMENODE_TCP_CORES); } } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java ---------------------------------------------------------------------- diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java index 55c882e..885f8c1 100644 --- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java +++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageConstants.java @@ -53,11 +53,16 @@ public class TcpStorageConstants { public static final String STORAGE_TCP_QUEUE_DEPTH_KEY = "crail.storage.tcp.queuedepth"; public static int STORAGE_TCP_QUEUE_DEPTH = 16; + public static final String STORAGE_TCP_CORES_KEY = "crail.storage.tcp.cores"; + public static int STORAGE_TCP_CORES = 1; + public static void init(CrailConfiguration conf, String[] args) throws Exception { if (args != null) { Option portOption = Option.builder("p").desc("port to start server on").hasArg().build(); + Option coresOption = Option.builder("c").desc("number of cores to use").hasArg().build(); Options options = new Options(); options.addOption(portOption); + options.addOption(coresOption); CommandLineParser parser = new DefaultParser(); try { @@ -67,6 +72,12 @@ public class TcpStorageConstants { LOG.info("using custom port " + port); conf.set(TcpStorageConstants.STORAGE_TCP_PORT_KEY, port); } + if (line.hasOption(coresOption.getOpt())) { + String cores = line.getOptionValue(coresOption.getOpt()); + LOG.info("number of cores used is " + cores); + conf.set(TcpStorageConstants.STORAGE_TCP_CORES_KEY, cores); + } + } catch (ParseException e) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("RDMA storage tier", options); @@ -95,6 +106,9 @@ public class TcpStorageConstants { } if (conf.get(STORAGE_TCP_QUEUE_DEPTH_KEY) != null) { STORAGE_TCP_QUEUE_DEPTH = Integer.parseInt(conf.get(STORAGE_TCP_QUEUE_DEPTH_KEY)); + } + if (conf.get(STORAGE_TCP_CORES_KEY) != null) { + STORAGE_TCP_CORES = Integer.parseInt(conf.get(STORAGE_TCP_CORES_KEY)); } } @@ -105,6 +119,7 @@ public class TcpStorageConstants { logger.info(STORAGE_TCP_ALLOCATION_SIZE_KEY + " " + STORAGE_TCP_ALLOCATION_SIZE); logger.info(STORAGE_TCP_DATA_PATH_KEY + " " + STORAGE_TCP_DATA_PATH); logger.info(STORAGE_TCP_QUEUE_DEPTH_KEY + " " + STORAGE_TCP_QUEUE_DEPTH); + logger.info(STORAGE_TCP_CORES_KEY + " " + STORAGE_TCP_CORES); } } http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/d58abd82/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java ---------------------------------------------------------------------- diff --git a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java index 9ed2260..f415f07 100644 --- a/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java +++ b/storage-narpc/src/main/java/org/apache/crail/storage/tcp/TcpStorageServer.java @@ -61,7 +61,7 @@ public class TcpStorageServer implements Runnable, StorageServer, NaRPCService<T public void init(CrailConfiguration conf, String[] args) throws Exception { TcpStorageConstants.init(conf, args); - this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false); + this.serverGroup = new NaRPCServerGroup<TcpStorageRequest, TcpStorageResponse>(this, TcpStorageConstants.STORAGE_TCP_QUEUE_DEPTH, (int) CrailConstants.BLOCK_SIZE*2, false, TcpStorageConstants.STORAGE_TCP_CORES); this.serverEndpoint = serverGroup.createServerEndpoint(); this.address = getDataNodeAddress(); serverEndpoint.bind(address);