[hive] branch master updated: HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)

2020-02-26 Thread prasanthj
This is an automated email from the ASF dual-hosted git repository.

prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
 new cfc12f0  HIVE-22927: LLAP should filter tasks in HB, instead of 
killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth 
Jayachandran)
cfc12f0 is described below

commit cfc12f05f0c034f9aad149960e58d40902e0dcfe
Author: Rajesh Balamohan 
AuthorDate: Wed Feb 26 15:14:52 2020 -0800

HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on 
error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)
---
 .../hive/llap/tezplugins/LlapTaskCommunicator.java | 57 +-
 1 file changed, 24 insertions(+), 33 deletions(-)

diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index e5dc378..b168f76 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -762,46 +762,37 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 BiMap biMap =
 entityTracker.getContainerAttemptMapForNode(nodeId);
 if (biMap != null) {
-  HashMap attempts = new HashMap<>();
-  for (int i = 0; i < tasks.get().length; ++i) {
-boolean isGuaranteed = false;
-if (guaranteed != null) {
-  isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get();
-}
-attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed);
-  }
-  String error = "";
+  Set error = new HashSet<>();
   synchronized (biMap) {
-for (Map.Entry entry : 
biMap.entrySet()) {
-  // TODO: this is a stopgap fix. We really need to change all 
mappings by unique node ID,
-  //   or at least (in this case) track the latest unique ID for 
LlapNode and retry all
-  //   older-node tasks proactively. For now let the heartbeats 
fail them.
-  TezTaskAttemptID attemptId = entry.getValue();
-  String taskNodeId = entityTracker.getUniqueNodeId(attemptId);
-  // Unique ID is registered based on Submit response. Theoretically, 
we could get a ping
-  // when the task is valid but we haven't stored the unique ID yet, 
so taskNodeId is null.
-  // However, the next heartbeat(s) should get the value eventually 
and mark task as alive.
-  // Also, we prefer a missed heartbeat over a stuck query in case of 
discrepancy in ET.
-  if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
-Boolean isGuaranteed = attempts.get(attemptId);
-if (isGuaranteed != null) {
-  getContext().taskAlive(attemptId);
-  scheduler.taskInfoUpdated(attemptId, 
isGuaranteed.booleanValue());
+for (int i = 0; i < tasks.get().length; ++i) {
+  boolean isGuaranteed = false;
+  if (guaranteed != null) {
+isGuaranteed = ((BooleanWritable) guaranteed.get()[i]).get();
+  }
+  TezTaskAttemptID attemptID = (TezTaskAttemptID) tasks.get()[i];
+
+  // Check if the taskAttempt is present in AM view
+  if (biMap.containsValue(attemptID)) {
+String taskNodeId = entityTracker.getUniqueNodeId(attemptID);
+if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
+  getContext().taskAlive(attemptID);
+  scheduler.taskInfoUpdated(attemptID, isGuaranteed);
+  getContext().containerAlive(biMap.inverse().get(attemptID));
 } else {
-  error += (attemptId + ", ");
+  error.add(attemptID);
 }
-getContext().containerAlive(entry.getKey());
   }
 }
+  }
 
-if (!error.isEmpty()) {
-  LOG.info("The tasks we expected to be on the node are not there: " + 
error);
-  for (Map.Entry entry : 
biMap.entrySet()) {
-LOG.info("Sending a kill for attempt {}, due to a ping from node 
with same host and same port but " +
-"registered with different unique ID", entry.getValue());
-getContext().taskKilled(entry.getValue(), 
TaskAttemptEndReason.NODE_FAILED,
+  if (!error.isEmpty()) {
+LOG.info("The tasks we expected to be on the node are not there: " + 
error);
+for (TezTaskAttemptID attempt: error) {
+  LOG.info("Sending a kill for attempt {}, due to a ping from "
+  + "node with same host and same port but " +
+  "registered with different unique ID", attempt);
+  getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED,
   "Node with same host and port but with new unique ID pinged");
-  }
 }
 

[hive] branch master updated: HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V)

2020-02-26 Thread prasanthj
This is an automated email from the ASF dual-hosted git repository.

prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
 new f087eac  HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if 
pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V)
f087eac is described below

commit f087eac18a64f97e890427c5bdeff72269a86b55
Author: Prasanth Jayachandran 
AuthorDate: Wed Feb 26 15:12:33 2020 -0800

HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts 
in k8s (Prasanth Jayachandran reviewed by Gopal V)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java|  3 +++
 .../hive/llap/daemon/impl/ContainerRunnerImpl.java| 19 ++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e419dc5..bfc2695 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4380,6 +4380,9 @@ public class HiveConf extends Configuration {
   "considering the AM to be dead.", 
"llap.am.liveness.connection.timeout-millis"),
 LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", true,
 "Whether to use FQDN of the AM machine when submitting work to LLAP."),
+LLAP_DAEMON_EXEC_USE_FQDN("hive.llap.exec.use.fqdn", true,
+  "On non-kerberized clusters, where the hostnames are stable but ip 
address changes, setting this config\n" +
+" to false will use ip address of llap daemon in execution context 
instead of FQDN"),
 // Not used yet - since the Writable RPC engine does not support this 
policy.
 LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
   "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2ae7871..6a13b55 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -15,6 +15,7 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -117,6 +119,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   private final DaemonId daemonId;
   private final UgiFactory fsUgiFactory;
   private final SocketFactory socketFactory;
+  private final boolean execUseFQDN;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, 
AtomicReference localShufflePort,
   AtomicReference localAddress,
@@ -140,6 +143,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
 this.queryTracker = queryTracker;
 this.executorService = executorService;
 completionListener = (SchedulerFragmentCompletingListener) executorService;
+this.execUseFQDN = 
conf.getBoolean(HiveConf.ConfVars.LLAP_DAEMON_EXEC_USE_FQDN.varname, true);
 
 // Distribute the available memory between the tasks.
 this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) 
numExecutors);
@@ -285,10 +289,23 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, 
tezSummary.defaultBoolVal)
  && callableConf.getBoolean(llapTasks.varname, 
llapTasks.defaultBoolVal);
 
+  final String llapHost;
+  if (UserGroupInformation.isSecurityEnabled()) {
+// when kerberos is enabled always use FQDN
+llapHost = localAddress.get().getHostName();
+  } else if (execUseFQDN) {
+// when FQDN is explicitly requested (default)
+llapHost = localAddress.get().getHostName();
+  } else {
+// when FQDN is not requested, use ip address
+llapHost = localAddress.get().getAddress().getHostAddress();
+  }
+  LOG.info("Using llap host: {} for execution context. hostName: {} 
hostAddress: {}", llapHost,
+localAddress.get().getHostName(), 
localAddress.get().getAddress().getHostAddress());
   // TODO: ideally we'd register TezCounters here, but it seems impossible 
before registerTask.
   WmFragmentCounters 

[hive] branch master updated: HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by Peter Vary)

2020-02-26 Thread lpinter
This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
 new 4b93296  HIVE-20948: Eliminate file rename in compactor (Laszlo 
Pinter, reviewed by Peter Vary)
4b93296 is described below

commit 4b932966428c7bb58f1307e459849ac092fa9cbc
Author: Laszlo Pinter 
AuthorDate: Wed Feb 26 09:42:01 2020 +0100

HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by 
Peter Vary)
---
 .../hadoop/hive/ql/exec/FileSinkOperator.java  |  21 +++-
 .../hadoop/hive/ql/exec/tez/SplitGrouper.java  |  23 +---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java|  50 -
 .../apache/hadoop/hive/ql/plan/FileSinkDesc.java   |   8 ++
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |   4 +-
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |  66 +--
 .../hive/ql/txn/compactor/MinorQueryCompactor.java | 114 ---
 .../hive/ql/txn/compactor/QueryCompactor.java  | 123 -
 8 files changed, 181 insertions(+), 228 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index d5e1b5b..d0f452b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -84,6 +84,7 @@ import 
org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -473,6 +474,7 @@ public class FileSinkOperator extends 
TerminalOperator implements
   protected transient boolean multiFileSpray;
   protected transient final Map bucketMap = new 
HashMap();
   private transient boolean isBucketed = false;
+  private transient int bucketId;
 
   private transient ObjectInspector[] partitionObjectInspectors;
   protected transient HivePartitioner prtner;
@@ -805,7 +807,12 @@ public class FileSinkOperator extends 
TerminalOperator implements
   protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
   throws HiveException {
 try {
-  fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), 
isSkewedStoredAsSubDirectories);
+  if (conf.isCompactionTable()) {
+fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + 
String.format(AcidUtils.BUCKET_DIGITS, bucketId),
+isNativeTable(), isSkewedStoredAsSubDirectories);
+  } else {
+fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), 
isSkewedStoredAsSubDirectories);
+  }
   if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
 Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + 
": final path " + fsp.finalPaths[filesIdx]
   + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath 
+ ", tmp path "
@@ -828,7 +835,7 @@ public class FileSinkOperator extends 
TerminalOperator implements
   //todo IOW integration. Full Acid uses the else if block to create 
Acid's RecordUpdater (HiveFileFormatUtils)
   // and that will set writingBase(conf.getInsertOverwrite())
   // If MM wants to create a new base for IOW (instead of delta dir), it 
should specify it here
-  if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable()) {
+  if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable() || conf.isCompactionTable()) {
 Path outPath = fsp.outPaths[filesIdx];
 if (conf.isMmTable()
 && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
@@ -960,6 +967,10 @@ public class FileSinkOperator extends 
TerminalOperator implements
   createNewPaths(null, lbDirName);
 }
   } else {
+if (conf.isCompactionTable()) {
+  int bucketProperty = ((IntWritable)((Object[])row)[2]).get();
+  bucketId = 
BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+}
 createBucketFiles(fsp);
   }
 }
@@ -1049,7 +1060,7 @@ public class FileSinkOperator extends 
TerminalOperator implements
   // for a given operator branch prediction should work quite nicely on it.
   // RecordUpdater expects to get the actual row, not a serialized version 
of it.  Thus we
   // pass the row rather than recordValue.
-  if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable()) {
+  if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable() || conf.isCompactionTable()) {
 rowOutWriters[findWriterOffset(row)].write(recordValue);
   } else if (conf.getWriteType() ==