This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new d9b3833 HIVE-20841: LLAP: Make dynamic ports configurable (Prasanth Jayachandran reviewed by Sergey Shelukhin) d9b3833 is described below commit d9b3833e0c7b0ee008d1dd39bacc8f758170156f Author: Prasanth Jayachandran <prasan...@apache.org> AuthorDate: Tue Feb 12 00:28:09 2019 -0800 HIVE-20841: LLAP: Make dynamic ports configurable (Prasanth Jayachandran reviewed by Sergey Shelukhin) --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 4 ++++ .../hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java | 8 ++++++-- .../apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java | 6 +++++- .../hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java | 2 +- .../hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java | 7 +++++-- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a04ef38..3bb482f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4010,6 +4010,8 @@ public class HiveConf extends Configuration { LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5, "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"), + LLAP_PLUGIN_RPC_PORT("hive.llap.plugin.rpc.port", 0, + "Port to use for LLAP plugin rpc server"), LLAP_PLUGIN_RPC_NUM_HANDLERS("hive.llap.plugin.rpc.num.handlers", 1, "Number of RPC handlers for AM LLAP plugin endpoint."), LLAP_DAEMON_WORK_DIRS("hive.llap.daemon.work.dirs", "", @@ -4182,6 +4184,8 @@ public class HiveConf extends Configuration { "Sleep duration (in milliseconds) to wait before retrying on error when obtaining a\n" + "connection to LLAP daemon from Tez AM.", "llap.task.communicator.connection.sleep-between-retries-millis"), + LLAP_TASK_UMBILICAL_SERVER_PORT("hive.llap.daemon.umbilical.port", 0, + "LLAP task umbilical server RPC port"), LLAP_DAEMON_WEB_PORT("hive.llap.daemon.web.port", 15002, "LLAP daemon web UI port.", "llap.daemon.service.port"), LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false, diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java index 89cb6fb..a16c0af 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.ipc.RPC; @@ -53,11 +54,14 @@ public class LlapTaskUmbilicalServer { public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers) throws IOException { jobTokenSecretManager = new JobTokenSecretManager(); - + int umbilicalPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT); + if (umbilicalPort <= 0) { + umbilicalPort = 0; + } server = new RPC.Builder(conf) .setProtocol(LlapTaskUmbilicalProtocol.class) .setBindAddress("0.0.0.0") - .setPort(0) + .setPort(umbilicalPort) .setInstance(umbilical) .setNumHandlers(numHandlers) .setSecretManager(jobTokenSecretManager).build(); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 5d4ce22..2dfd359 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -254,10 +254,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { int numHandlers = HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT); + int umbilicalPort = HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT); + if (umbilicalPort <= 0) { + umbilicalPort = 0; + } server = new RPC.Builder(conf) .setProtocol(LlapTaskUmbilicalProtocol.class) .setBindAddress("0.0.0.0") - .setPort(0) + .setPort(umbilicalPort) .setInstance(umbilical) .setNumHandlers(numHandlers) .setSecretManager(jobTokenSecretManager).build(); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 8217964..0ea4c09 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -317,7 +317,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { serializedToken = jobIdForToken = null; } pluginEndpoint = new LlapPluginServerImpl(sm, - HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), this); + HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), this, HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT)); } else { serializedToken = jobIdForToken = null; pluginEndpoint = null; diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java index e9a011a..6e6785e 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java @@ -42,13 +42,16 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP private final int numHandlers; private final LlapTaskSchedulerService parent; private final AtomicReference<InetSocketAddress> bindAddress = new AtomicReference<>(); + private final int port; public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager, - int numHandlers, LlapTaskSchedulerService parent) { + int numHandlers, LlapTaskSchedulerService parent, int port) { super("LlapPluginServerImpl"); this.secretManager = secretManager; this.numHandlers = numHandlers; this.parent = parent; + this.port = port <= 0 ? 0 : port; + LOG.info("Llap plugin server using port: {} #handlers: {}", port, numHandlers); } @Override @@ -63,7 +66,7 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP final Configuration conf = getConfig(); final BlockingService daemonImpl = LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this); - server = LlapUtil.startProtocolServer(0, numHandlers, bindAddress , conf, daemonImpl, + server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress , conf, daemonImpl, LlapPluginProtocolPB.class, secretManager, new LlapPluginPolicyProvider(), ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY); LOG.info("Starting the plugin endpoint on port " + bindAddress.get().getPort());