[hive] branch master updated: HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)
This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git The following commit(s) were added to refs/heads/master by this push: new cfc12f0 HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran) cfc12f0 is described below commit cfc12f05f0c034f9aad149960e58d40902e0dcfe Author: Rajesh Balamohan AuthorDate: Wed Feb 26 15:14:52 2020 -0800 HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran) --- .../hive/llap/tezplugins/LlapTaskCommunicator.java | 57 +- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index e5dc378..b168f76 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -762,46 +762,37 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { BiMap biMap = entityTracker.getContainerAttemptMapForNode(nodeId); if (biMap != null) { - HashMap attempts = new HashMap<>(); - for (int i = 0; i < tasks.get().length; ++i) { -boolean isGuaranteed = false; -if (guaranteed != null) { - isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get(); -} -attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed); - } - String error = ""; + Set error = new HashSet<>(); synchronized (biMap) { -for (Map.Entry entry : biMap.entrySet()) { - // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID, - // or at least (in this case) track the latest unique ID for LlapNode and retry all - // older-node tasks proactively. For now let the heartbeats fail them. - TezTaskAttemptID attemptId = entry.getValue(); - String taskNodeId = entityTracker.getUniqueNodeId(attemptId); - // Unique ID is registered based on Submit response. Theoretically, we could get a ping - // when the task is valid but we haven't stored the unique ID yet, so taskNodeId is null. - // However, the next heartbeat(s) should get the value eventually and mark task as alive. - // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET. - if (taskNodeId != null && taskNodeId.equals(uniqueId)) { -Boolean isGuaranteed = attempts.get(attemptId); -if (isGuaranteed != null) { - getContext().taskAlive(attemptId); - scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue()); +for (int i = 0; i < tasks.get().length; ++i) { + boolean isGuaranteed = false; + if (guaranteed != null) { +isGuaranteed = ((BooleanWritable) guaranteed.get()[i]).get(); + } + TezTaskAttemptID attemptID = (TezTaskAttemptID) tasks.get()[i]; + + // Check if the taskAttempt is present in AM view + if (biMap.containsValue(attemptID)) { +String taskNodeId = entityTracker.getUniqueNodeId(attemptID); +if (taskNodeId != null && taskNodeId.equals(uniqueId)) { + getContext().taskAlive(attemptID); + scheduler.taskInfoUpdated(attemptID, isGuaranteed); + getContext().containerAlive(biMap.inverse().get(attemptID)); } else { - error += (attemptId + ", "); + error.add(attemptID); } -getContext().containerAlive(entry.getKey()); } } + } -if (!error.isEmpty()) { - LOG.info("The tasks we expected to be on the node are not there: " + error); - for (Map.Entry entry : biMap.entrySet()) { -LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " + -"registered with different unique ID", entry.getValue()); -getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED, + if (!error.isEmpty()) { +LOG.info("The tasks we expected to be on the node are not there: " + error); +for (TezTaskAttemptID attempt: error) { + LOG.info("Sending a kill for attempt {}, due to a ping from " + + "node with same host and same port but " + + "registered with different unique ID", attempt); + getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED, "Node with same host and port but with new unique ID pinged"); - } }
[hive] branch master updated: HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository. prasanthj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git The following commit(s) were added to refs/heads/master by this push: new f087eac HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V) f087eac is described below commit f087eac18a64f97e890427c5bdeff72269a86b55 Author: Prasanth Jayachandran AuthorDate: Wed Feb 26 15:12:33 2020 -0800 HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java| 3 +++ .../hive/llap/daemon/impl/ContainerRunnerImpl.java| 19 ++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e419dc5..bfc2695 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4380,6 +4380,9 @@ public class HiveConf extends Configuration { "considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"), LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", true, "Whether to use FQDN of the AM machine when submitting work to LLAP."), +LLAP_DAEMON_EXEC_USE_FQDN("hive.llap.exec.use.fqdn", true, + "On non-kerberized clusters, where the hostnames are stable but ip address changes, setting this config\n" + +" to false will use ip address of llap daemon in execution context instead of FQDN"), // Not used yet - since the Writable RPC engine does not support this policy. LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS( "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2ae7871..6a13b55 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapNodeId; @@ -117,6 +119,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private final DaemonId daemonId; private final UgiFactory fsUgiFactory; private final SocketFactory socketFactory; + private final boolean execUseFQDN; public ContainerRunnerImpl(Configuration conf, int numExecutors, AtomicReference localShufflePort, AtomicReference localAddress, @@ -140,6 +143,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.queryTracker = queryTracker; this.executorService = executorService; completionListener = (SchedulerFragmentCompletingListener) executorService; +this.execUseFQDN = conf.getBoolean(HiveConf.ConfVars.LLAP_DAEMON_EXEC_USE_FQDN.varname, true); // Distribute the available memory between the tasks. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) numExecutors); @@ -285,10 +289,23 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); + final String llapHost; + if (UserGroupInformation.isSecurityEnabled()) { +// when kerberos is enabled always use FQDN +llapHost = localAddress.get().getHostName(); + } else if (execUseFQDN) { +// when FQDN is explicitly requested (default) +llapHost = localAddress.get().getHostName(); + } else { +// when FQDN is not requested, use ip address +llapHost = localAddress.get().getAddress().getHostAddress(); + } + LOG.info("Using llap host: {} for execution context. hostName: {} hostAddress: {}", llapHost, +localAddress.get().getHostName(), localAddress.get().getAddress().getHostAddress()); // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask. WmFragmentCounters
[hive] branch master updated: HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git The following commit(s) were added to refs/heads/master by this push: new 4b93296 HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by Peter Vary) 4b93296 is described below commit 4b932966428c7bb58f1307e459849ac092fa9cbc Author: Laszlo Pinter AuthorDate: Wed Feb 26 09:42:01 2020 +0100 HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by Peter Vary) --- .../hadoop/hive/ql/exec/FileSinkOperator.java | 21 +++- .../hadoop/hive/ql/exec/tez/SplitGrouper.java | 23 +--- .../org/apache/hadoop/hive/ql/io/AcidUtils.java| 50 - .../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 8 ++ .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 4 +- .../hive/ql/txn/compactor/MajorQueryCompactor.java | 66 +-- .../hive/ql/txn/compactor/MinorQueryCompactor.java | 114 --- .../hive/ql/txn/compactor/QueryCompactor.java | 123 - 8 files changed, 181 insertions(+), 228 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index d5e1b5b..d0f452b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -473,6 +474,7 @@ public class FileSinkOperator extends TerminalOperator implements protected transient boolean multiFileSpray; protected transient final Map bucketMap = new HashMap(); private transient boolean isBucketed = false; + private transient int bucketId; private transient ObjectInspector[] partitionObjectInspectors; protected transient HivePartitioner prtner; @@ -805,7 +807,12 @@ public class FileSinkOperator extends TerminalOperator implements protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { - fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); + if (conf.isCompactionTable()) { +fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId), +isNativeTable(), isSkewedStoredAsSubDirectories); + } else { +fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " @@ -828,7 +835,7 @@ public class FileSinkOperator extends TerminalOperator implements //todo IOW integration. Full Acid uses the else if block to create Acid's RecordUpdater (HiveFileFormatUtils) // and that will set writingBase(conf.getInsertOverwrite()) // If MM wants to create a new base for IOW (instead of delta dir), it should specify it here - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) { Path outPath = fsp.outPaths[filesIdx]; if (conf.isMmTable() && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) { @@ -960,6 +967,10 @@ public class FileSinkOperator extends TerminalOperator implements createNewPaths(null, lbDirName); } } else { +if (conf.isCompactionTable()) { + int bucketProperty = ((IntWritable)((Object[])row)[2]).get(); + bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); +} createBucketFiles(fsp); } } @@ -1049,7 +1060,7 @@ public class FileSinkOperator extends TerminalOperator implements // for a given operator branch prediction should work quite nicely on it. // RecordUpdater expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) { rowOutWriters[findWriterOffset(row)].write(recordValue); } else if (conf.getWriteType() ==