This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new f087eac HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V) f087eac is described below commit f087eac18a64f97e890427c5bdeff72269a86b55 Author: Prasanth Jayachandran <prasan...@apache.org> AuthorDate: Wed Feb 26 15:12:33 2020 -0800 HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +++ .../hive/llap/daemon/impl/ContainerRunnerImpl.java | 19 ++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e419dc5..bfc2695 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4380,6 +4380,9 @@ public class HiveConf extends Configuration { "considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"), LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", true, "Whether to use FQDN of the AM machine when submitting work to LLAP."), + LLAP_DAEMON_EXEC_USE_FQDN("hive.llap.exec.use.fqdn", true, + "On non-kerberized clusters, where the hostnames are stable but ip address changes, setting this config\n" + + " to false will use ip address of llap daemon in execution context instead of FQDN"), // Not used yet - since the Writable RPC engine does not support this policy. LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS( "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2ae7871..6a13b55 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapNodeId; @@ -117,6 +119,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private final DaemonId daemonId; private final UgiFactory fsUgiFactory; private final SocketFactory socketFactory; + private final boolean execUseFQDN; public ContainerRunnerImpl(Configuration conf, int numExecutors, AtomicReference<Integer> localShufflePort, AtomicReference<InetSocketAddress> localAddress, @@ -140,6 +143,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.queryTracker = queryTracker; this.executorService = executorService; completionListener = (SchedulerFragmentCompletingListener) executorService; + this.execUseFQDN = conf.getBoolean(HiveConf.ConfVars.LLAP_DAEMON_EXEC_USE_FQDN.varname, true); // Distribute the available memory between the tasks. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) numExecutors); @@ -285,10 +289,23 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); + final String llapHost; + if (UserGroupInformation.isSecurityEnabled()) { + // when kerberos is enabled always use FQDN + llapHost = localAddress.get().getHostName(); + } else if (execUseFQDN) { + // when FQDN is explicitly requested (default) + llapHost = localAddress.get().getHostName(); + } else { + // when FQDN is not requested, use ip address + llapHost = localAddress.get().getAddress().getHostAddress(); + } + LOG.info("Using llap host: {} for execution context. hostName: {} hostAddress: {}", llapHost, + localAddress.get().getHostName(), localAddress.get().getAddress().getHostAddress()); // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask. WmFragmentCounters wmCounters = new WmFragmentCounters(addTaskTimes); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, - new ExecutionContextImpl(localAddress.get().getHostName()), env, + new ExecutionContextImpl(llapHost), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi, completionListener, socketFactory, isGuaranteed, wmCounters);