This is an automated email from the ASF dual-hosted git repository.

prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f087eac  HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if 
pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V)
f087eac is described below

commit f087eac18a64f97e890427c5bdeff72269a86b55
Author: Prasanth Jayachandran <prasan...@apache.org>
AuthorDate: Wed Feb 26 15:12:33 2020 -0800

    HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts 
in k8s (Prasanth Jayachandran reviewed by Gopal V)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java    |  3 +++
 .../hive/llap/daemon/impl/ContainerRunnerImpl.java    | 19 ++++++++++++++++++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e419dc5..bfc2695 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4380,6 +4380,9 @@ public class HiveConf extends Configuration {
       "considering the AM to be dead.", 
"llap.am.liveness.connection.timeout-millis"),
     LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", true,
         "Whether to use FQDN of the AM machine when submitting work to LLAP."),
+    LLAP_DAEMON_EXEC_USE_FQDN("hive.llap.exec.use.fqdn", true,
+      "On non-kerberized clusters, where the hostnames are stable but ip 
address changes, setting this config\n" +
+        " to false will use ip address of llap daemon in execution context 
instead of FQDN"),
     // Not used yet - since the Writable RPC engine does not support this 
policy.
     LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
       "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2ae7871..6a13b55 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -15,6 +15,7 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -117,6 +119,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   private final DaemonId daemonId;
   private final UgiFactory fsUgiFactory;
   private final SocketFactory socketFactory;
+  private final boolean execUseFQDN;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, 
AtomicReference<Integer> localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
@@ -140,6 +143,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     this.queryTracker = queryTracker;
     this.executorService = executorService;
     completionListener = (SchedulerFragmentCompletingListener) executorService;
+    this.execUseFQDN = 
conf.getBoolean(HiveConf.ConfVars.LLAP_DAEMON_EXEC_USE_FQDN.varname, true);
 
     // Distribute the available memory between the tasks.
     this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) 
numExecutors);
@@ -285,10 +289,23 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, 
tezSummary.defaultBoolVal)
                              && callableConf.getBoolean(llapTasks.varname, 
llapTasks.defaultBoolVal);
 
+      final String llapHost;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // when kerberos is enabled always use FQDN
+        llapHost = localAddress.get().getHostName();
+      } else if (execUseFQDN) {
+        // when FQDN is explicitly requested (default)
+        llapHost = localAddress.get().getHostName();
+      } else {
+        // when FQDN is not requested, use ip address
+        llapHost = localAddress.get().getAddress().getHostAddress();
+      }
+      LOG.info("Using llap host: {} for execution context. hostName: {} 
hostAddress: {}", llapHost,
+        localAddress.get().getHostName(), 
localAddress.get().getAddress().getHostAddress());
       // TODO: ideally we'd register TezCounters here, but it seems impossible 
before registerTask.
       WmFragmentCounters wmCounters = new WmFragmentCounters(addTaskTimes);
       TaskRunnerCallable callable = new TaskRunnerCallable(request, 
fragmentInfo, callableConf,
-          new ExecutionContextImpl(localAddress.get().getHostName()), env,
+          new ExecutionContextImpl(llapHost), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, 
killedTaskHandler,
           this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi,
           completionListener, socketFactory, isGuaranteed, wmCounters);

Reply via email to