Repository: hive Updated Branches: refs/heads/master 57d1f3d85 -> 8866d93b1
HIVE-13518 : Hive on Tez: Shuffle joins do not choose the right 'big' table. (Vikram Dixit via Gunther Hagleitner) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8866d93b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8866d93b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8866d93b Branch: refs/heads/master Commit: 8866d93b14a3489018e6516f8b85ef1a7f18960b Parents: 57d1f3d Author: Ashutosh Chauhan <hashut...@apache.org> Authored: Tue May 31 13:54:59 2016 -0700 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Tue May 31 13:57:51 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 84 +++++++++++--------- .../apache/hadoop/hive/ql/parse/GenTezWork.java | 3 +- .../clientpositive/tez/metadataonly1.q.out | 2 +- .../clientpositive/tez/vectorized_ptf.q.out | 2 +- 4 files changed, 49 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8866d93b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index b35f075..387f47d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -87,6 +87,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; JoinOperator joinOp = (JoinOperator) nd; + long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { @@ -110,7 +111,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize); if (mapJoinConversionPos < 0) { Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); if (retval == null) { @@ -134,7 +135,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // check if we can convert to map join no bucket scaling. LOG.info("Convert to non-bucketed map join"); if (numBuckets != 1) { - mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize); } if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle @@ -359,7 +360,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // MapRecordProcessor and ReduceRecordProcessor with respect to the sources. @SuppressWarnings({"rawtypes","unchecked"}) Set<ReduceSinkOperator> set = - OperatorUtils.findOperatorsUpstream((Collection)parentOp.getParentOperators(), + OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(), ReduceSinkOperator.class); if (size < 0) { size = set.size(); @@ -505,44 +506,42 @@ public class ConvertJoinMapJoin implements NodeProcessor { } public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, - int buckets) throws SemanticException { - /* - * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is - * an outer join down the join tree that requires filterTag. We disable this conversion to map - * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a - * new operation to be able to support this. This seems like a corner case enough to special - * case this for now. - */ - if (joinOp.getConf().getConds().length > 1) { - boolean hasOuter = false; - for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { - switch (joinCondDesc.getType()) { - case JoinDesc.INNER_JOIN: - case JoinDesc.LEFT_SEMI_JOIN: - case JoinDesc.UNIQUE_JOIN: - hasOuter = false; - break; - - case JoinDesc.FULL_OUTER_JOIN: - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - hasOuter = true; - break; - - default: - throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + int buckets, boolean skipJoinTypeChecks, long maxSize) throws SemanticException { + if (!skipJoinTypeChecks) { + /* + * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is + * an outer join down the join tree that requires filterTag. We disable this conversion to map + * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a + * new operation to be able to support this. This seems like a corner case enough to special + * case this for now. + */ + if (joinOp.getConf().getConds().length > 1) { + boolean hasOuter = false; + for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { + switch (joinCondDesc.getType()) { + case JoinDesc.INNER_JOIN: + case JoinDesc.LEFT_SEMI_JOIN: + case JoinDesc.UNIQUE_JOIN: + hasOuter = false; + break; + + case JoinDesc.FULL_OUTER_JOIN: + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + hasOuter = true; + break; + + default: + throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + } + } + if (hasOuter) { + return -1; } - } - if (hasOuter) { - return -1; } } Set<Integer> bigTableCandidateSet = MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); - - long maxSize = context.conf.getLongVar( - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - int bigTablePosition = -1; // big input cumulative row count long bigInputCumulativeCardinality = -1L; @@ -576,7 +575,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // on size and there's another one that's bigger. return -1; } - + if (inputSize/buckets > maxSize) { if (!bigTableCandidateSet.contains(pos)) { // can't use the current table as the big table, but it's too @@ -826,7 +825,9 @@ public class ConvertJoinMapJoin implements NodeProcessor { // Since we don't have big table index yet, must start with estimate of numReducers int numReducers = estimateNumBuckets(joinOp, false); LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); - int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, + context.conf.getLongVar( + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD)); if (bigTablePos >= 0) { // Now that we have the big table index, get real numReducers value based on big table RS ReduceSinkOperator bigTableParentRS = @@ -869,9 +870,14 @@ public class ConvertJoinMapJoin implements NodeProcessor { } } + int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false), + true, Long.MAX_VALUE); + if (pos < 0) { + LOG.info("Could not get a valid join position. Defaulting to position 0"); + pos = 0; + } // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - int pos = 0; // it doesn't matter which position we use in this case. LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } http://git-wip-us.apache.org/repos/asf/hive/blob/8866d93b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 46d279e..461ba37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -168,7 +168,8 @@ public class GenTezWork implements NodeProcessor { getParentFromStack(context.currentMergeJoinOperator, stack); // Set the big table position. Both the reduce work and merge join operator // should be set with the same value. - int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); +// int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); + int pos = context.currentMergeJoinOperator.getConf().getBigTablePosition(); work.setTag(pos); context.currentMergeJoinOperator.getConf().setBigTablePosition(pos); tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); http://git-wip-us.apache.org/repos/asf/hive/blob/8866d93b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/metadataonly1.q.out b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out index 15f5ed5..4075b81 100644 --- a/ql/src/test/results/clientpositive/tez/metadataonly1.q.out +++ b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out @@ -772,7 +772,7 @@ STAGE PLANS: keys: 0 _col0 (type: string) 1 _col0 (type: string) - Position of Big Table: 0 + Position of Big Table: 1 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: count() http://git-wip-us.apache.org/repos/asf/hive/blob/8866d93b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out index 0a62262..0435d28 100644 --- a/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out +++ b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out @@ -2117,7 +2117,7 @@ STAGE PLANS: 0 p_partkey (type: int) 1 _col0 (type: int) outputColumnNames: _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20 - Position of Big Table: 0 + Position of Big Table: 1 Statistics: Num rows: 28 Data size: 17646 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col12 (type: int), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: int), _col18 (type: string), _col19 (type: double), _col20 (type: string)