HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Conflicts: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdf4ef89 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdf4ef89 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdf4ef89 Branch: refs/heads/branch-2.1 Commit: bdf4ef8900ebf67fcb984469bb997c4f783bce89 Parents: b8903b3 Author: Sergey Shelukhin <ser...@apache.org> Authored: Fri Aug 12 13:33:39 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Fri Aug 12 13:43:52 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java | 2 +- .../org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java | 5 +++-- .../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java | 7 ++----- .../hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java | 5 ++--- .../hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java | 8 +------- 5 files changed, 9 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 99cdaa0..416606e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -145,7 +145,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem // the task in. On MR: The cache is a no-op. String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container"; - cache = ObjectCacheFactory.getCache(hconf, queryId); + cache = ObjectCacheFactory.getCache(hconf, queryId, false); loader = getHashTableLoader(hconf); hashMapRowGetters = null; http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java index 5201120..5a19030 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java @@ -44,10 +44,11 @@ public class ObjectCacheFactory { /** * Returns the appropriate cache */ - public static ObjectCache getCache(Configuration conf, String queryId) { + public static ObjectCache getCache(Configuration conf, String queryId, boolean isPlanCache) { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (LlapProxy.isDaemon()) { // daemon - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED) + && !isPlanCache) { // LLAP object cache, unlike others, does not use globals. Thus, get the existing one. return getLlapObjectCache(queryId); } else { // no cache http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 1e92f0a..7c8ea9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -94,14 +93,12 @@ public class MapRecordProcessor extends RecordProcessor { public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - if (LlapProxy.isDaemon()) { // do not cache plan + if (LlapProxy.isDaemon()) { String id = queryId + "_" + context.getTaskIndex(); l4j.info("LLAP_OF_ID: "+id); jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id); - cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); - } else { - cache = ObjectCacheFactory.getCache(jconf, queryId); } + cache = ObjectCacheFactory.getCache(jconf, queryId, true); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); cacheKeys = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index ec97856..b7f1011 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -63,7 +63,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { private MergeFileWork mfWork; MRInputLegacy mrInput = null; private final Object[] row = new Object[2]; - ObjectCache cache; + org.apache.hadoop.hive.ql.exec.ObjectCache cache; public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) { super(jconf, context); @@ -95,8 +95,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { } String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory - .getCache(jconf, queryId); + cache = ObjectCacheFactory.getCache(jconf, queryId, true); try { execContext.setJc(jconf); http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 1390a00..cf3c8ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -89,14 +89,8 @@ public class ReduceRecordProcessor extends RecordProcessor{ public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); - ObjectCache cache; - String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - if (LlapProxy.isDaemon()) { // don't cache plan - cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); - } else { - cache = ObjectCacheFactory.getCache(jconf, queryId); - } + cache = ObjectCacheFactory.getCache(jconf, queryId, true); String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY; cacheKeys = Lists.newArrayList(cacheKey);