Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java Fri Mar 28 06:11:07 2014 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; + +public class AnnotateOpTraitsProcCtx implements NodeProcessorCtx { + + ParseContext parseContext; + HiveConf conf; + + public AnnotateOpTraitsProcCtx(ParseContext parseContext) { + this.setParseContext(parseContext); + if(parseContext != null) { + this.setConf(parseContext.getConf()); + } else { + this.setConf(null); + } + } + + public HiveConf getConf() { + return conf; + } + + public void setConf(HiveConf conf) { + this.conf = conf; + } + + public ParseContext getParseContext() { + return parseContext; + } + + public void setParseContext(ParseContext parseContext) { + this.parseContext = parseContext; + } + +}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java Fri Mar 28 06:11:07 2014 @@ -0,0 +1,78 @@ +package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.DemuxOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.MuxOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.OpTraitsRulesProcFactory; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/* + * This class annotates each operator with its traits. The OpTraits class + * specifies the traits that are populated for each operator. + */ +public class AnnotateWithOpTraits implements Transform { + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + AnnotateOpTraitsProcCtx annotateCtx = new AnnotateOpTraitsProcCtx(pctx); + + // create a walker which walks the tree in a DFS manner while maintaining the + // operator stack. The dispatcher generates the plan from the operator tree + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getTableScanRule()); + opRules.put(new RuleRegExp("RS", ReduceSinkOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getReduceSinkRule()); + opRules.put(new RuleRegExp("JOIN", JoinOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getJoinRule()); + opRules.put(new RuleRegExp("MAPJOIN", MapJoinOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getMultiParentRule()); + opRules.put(new RuleRegExp("SMB", SMBMapJoinOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getMultiParentRule()); + opRules.put(new RuleRegExp("MUX", MuxOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getMultiParentRule()); + opRules.put(new RuleRegExp("DEMUX", DemuxOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getMultiParentRule()); + opRules.put(new RuleRegExp("UNION", UnionOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getMultiParentRule()); + opRules.put(new RuleRegExp("GBY", GroupByOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getGroupByRule()); + opRules.put(new RuleRegExp("SEL", SelectOperator.getOperatorName() + "%"), + OpTraitsRulesProcFactory.getSelectRule()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(OpTraitsRulesProcFactory.getDefaultRule(), opRules, + annotateCtx); + GraphWalker ogw = new PreOrderWalker(disp); + + // Create a list of topop nodes + ArrayList<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + return pctx; + } + +} Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Fri Mar 28 06:11:07 2014 @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.AbstractBucketJoinProc; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.OpTraits; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/* + * This class populates the following operator traits for the entire operator tree: + * 1. Bucketing columns. + * 2. Table + * 3. Pruned partitions + * + * Bucketing columns refer to not to the bucketing columns from the table object but instead + * to the dynamic 'bucketing' done by operators such as reduce sinks and group-bys. + * All the operators have a translation from their input names to the output names corresponding + * to the bucketing column. The colExprMap that is a part of every operator is used in this + * transformation. + * + * The table object is used for the base-case in map-reduce when deciding to perform a bucket + * map join. This object is used in the BucketMapJoinProc to find if number of files for the + * table correspond to the number of buckets specified in the meta data. + * + * The pruned partition information has the same purpose as the table object at the moment. + * + * The traits of sorted-ness etc. can be populated as well for future optimizations to make use of. + */ + +public class OpTraitsRulesProcFactory { + + public static class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + @SuppressWarnings("unchecked") + Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)nd; + op.setOpTraits(op.getParentOperators().get(0).getOpTraits()); + return null; + } + + } + + /* + * Reduce sink operator is the de-facto operator + * for determining keyCols (emit keys of a map phase) + */ + public static class ReduceSinkRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + ReduceSinkOperator rs = (ReduceSinkOperator)nd; + List<String> bucketCols = new ArrayList<String>(); + if (rs.getColumnExprMap() != null) { + for (ExprNodeDesc exprDesc : rs.getConf().getKeyCols()) { + for (Entry<String, ExprNodeDesc> entry : rs.getColumnExprMap().entrySet()) { + if (exprDesc.isSame(entry.getValue())) { + bucketCols.add(entry.getKey()); + } + } + } + } + + List<List<String>> listBucketCols = new ArrayList<List<String>>(); + listBucketCols.add(bucketCols); + OpTraits opTraits = new OpTraits(listBucketCols, -1); + rs.setOpTraits(opTraits); + return null; + } + } + + /* + * Table scan has the table object and pruned partitions that has information such as + * bucketing, sorting, etc. that is used later for optimization. + */ + public static class TableScanRule implements NodeProcessor { + + public boolean checkBucketedTable(Table tbl, + ParseContext pGraphContext, + PrunedPartitionList prunedParts) throws SemanticException { + + if (tbl.isPartitioned()) { + List<Partition> partitions = prunedParts.getNotDeniedPartns(); + // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) + if (!partitions.isEmpty()) { + for (Partition p : partitions) { + List<String> fileNames = + AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext); + // The number of files for the table should be same as number of buckets. + int bucketCount = p.getBucketCount(); + + if (fileNames.size() != 0 && fileNames.size() != bucketCount) { + return false; + } + } + } + } else { + + List<String> fileNames = + AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); + Integer num = new Integer(tbl.getNumBuckets()); + + // The number of files for the table should be same as number of buckets. + if (fileNames.size() != 0 && fileNames.size() != num) { + return false; + } + } + + return true; + } + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + TableScanOperator ts = (TableScanOperator)nd; + AnnotateOpTraitsProcCtx opTraitsCtx = (AnnotateOpTraitsProcCtx)procCtx; + Table table = opTraitsCtx.getParseContext().getTopToTable().get(ts); + PrunedPartitionList prunedPartList = null; + try { + prunedPartList = + opTraitsCtx.getParseContext().getPrunedPartitions(ts.getConf().getAlias(), ts); + } catch (HiveException e) { + prunedPartList = null; + } + boolean bucketMapJoinConvertible = checkBucketedTable(table, + opTraitsCtx.getParseContext(), prunedPartList); + List<List<String>>bucketCols = new ArrayList<List<String>>(); + int numBuckets = -1; + if (bucketMapJoinConvertible) { + bucketCols.add(table.getBucketCols()); + numBuckets = table.getNumBuckets(); + } + OpTraits opTraits = new OpTraits(bucketCols, numBuckets); + ts.setOpTraits(opTraits); + return null; + } + } + + /* + * Group-by re-orders the keys emitted hence, the keyCols would change. + */ + public static class GroupByRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + GroupByOperator gbyOp = (GroupByOperator)nd; + List<String> gbyKeys = new ArrayList<String>(); + for (ExprNodeDesc exprDesc : gbyOp.getConf().getKeys()) { + for (Entry<String, ExprNodeDesc> entry : gbyOp.getColumnExprMap().entrySet()) { + if (exprDesc.isSame(entry.getValue())) { + gbyKeys.add(entry.getKey()); + } + } + } + + List<List<String>> listBucketCols = new ArrayList<List<String>>(); + listBucketCols.add(gbyKeys); + OpTraits opTraits = new OpTraits(listBucketCols, -1); + gbyOp.setOpTraits(opTraits); + return null; + } + } + + public static class SelectRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + SelectOperator selOp = (SelectOperator)nd; + List<List<String>> parentBucketColNames = + selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); + + List<List<String>> listBucketCols = new ArrayList<List<String>>(); + if (selOp.getColumnExprMap() != null) { + if (parentBucketColNames != null) { + for (List<String> colNames : parentBucketColNames) { + List<String> bucketColNames = new ArrayList<String>(); + for (String colName : colNames) { + for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) { + if (entry.getValue() instanceof ExprNodeColumnDesc) { + if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) { + bucketColNames.add(entry.getKey()); + } + } + } + } + listBucketCols.add(bucketColNames); + } + } + } + + int numBuckets = -1; + if (selOp.getParentOperators().get(0).getOpTraits() != null) { + numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets(); + } + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets); + selOp.setOpTraits(opTraits); + return null; + } + } + + public static class JoinRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + JoinOperator joinOp = (JoinOperator)nd; + List<List<String>> bucketColsList = new ArrayList<List<String>>(); + byte pos = 0; + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + if (!(parentOp instanceof ReduceSinkOperator)) { + // can be mux operator + break; + } + ReduceSinkOperator rsOp = (ReduceSinkOperator)parentOp; + if (rsOp.getOpTraits() == null) { + ReduceSinkRule rsRule = new ReduceSinkRule(); + rsRule.process(rsOp, stack, procCtx, nodeOutputs); + } + bucketColsList.add(getOutputColNames(joinOp, rsOp, pos)); + pos++; + } + + joinOp.setOpTraits(new OpTraits(bucketColsList, -1)); + return null; + } + + private List<String> getOutputColNames(JoinOperator joinOp, + ReduceSinkOperator rs, byte pos) { + List<List<String>> parentBucketColNames = + rs.getOpTraits().getBucketColNames(); + + if (parentBucketColNames != null) { + List<String> bucketColNames = new ArrayList<String>(); + + // guaranteed that there is only 1 list within this list because + // a reduce sink always brings down the bucketing cols to a single list. + // may not be true with correlation operators (mux-demux) + List<String> colNames = parentBucketColNames.get(0); + for (String colName : colNames) { + for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) { + if (exprNode instanceof ExprNodeColumnDesc) { + if(((ExprNodeColumnDesc)(exprNode)).getColumn().equals(colName)) { + for (Entry<String, ExprNodeDesc> entry : joinOp.getColumnExprMap().entrySet()) { + if (entry.getValue().isSame(exprNode)) { + bucketColNames.add(entry.getKey()); + // we have found the colName + break; + } + } + } else { + // continue on to the next exprNode to find a match + continue; + } + // we have found the colName. No need to search more exprNodes. + break; + } + } + } + + return bucketColNames; + } + + // no col names in parent + return null; + } + } + + /* + * When we have operators that have multiple parents, it is not + * clear which parent's traits we need to propagate forward. + */ + public static class MultiParentRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + OpTraits opTraits = new OpTraits(null, -1); + @SuppressWarnings("unchecked") + Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd; + operator.setOpTraits(opTraits); + return null; + } + } + + public static NodeProcessor getTableScanRule() { + return new TableScanRule(); + } + + public static NodeProcessor getReduceSinkRule() { + return new ReduceSinkRule(); + } + + public static NodeProcessor getSelectRule() { + return new SelectRule(); + } + + public static NodeProcessor getDefaultRule() { + return new DefaultRule(); + } + + public static NodeProcessor getMultiParentRule() { + return new MultiParentRule(); + } + + public static NodeProcessor getGroupByRule() { + return new GroupByRule(); + } + + public static NodeProcessor getJoinRule() { + return new JoinRule(); + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Mar 28 06:11:07 2014 @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; @@ -45,6 +47,8 @@ import org.apache.hadoop.hive.ql.plan.De import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; /** @@ -89,7 +93,7 @@ public class GenTezProcContext implement // a map that keeps track of work that need to be linked while // traversing an operator tree - public final Map<Operator<?>, List<BaseWork>> linkOpWithWorkMap; + public final Map<Operator<?>, Map<BaseWork,TezEdgeProperty>> linkOpWithWorkMap; // a map to keep track of what reduce sinks have to be hooked up to // map join work @@ -144,7 +148,7 @@ public class GenTezProcContext implement this.currentTask = (TezTask) TaskFactory.get( new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>(); - this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>(); + this.linkOpWithWorkMap = new HashMap<Operator<?>, Map<BaseWork, TezEdgeProperty>>(); this.linkWorkWithReduceSinkMap = new HashMap<BaseWork, List<ReduceSinkOperator>>(); this.mapJoinWorkMap = new HashMap<MapJoinOperator, List<BaseWork>>(); this.rootToWorkMap = new HashMap<Operator<?>, BaseWork>(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Fri Mar 28 06:11:07 2014 @@ -46,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezUtils is a collection of shared helper methods to produce @@ -104,9 +105,10 @@ public class GenTezUtils { setupReduceSink(context, reduceWork, reduceSink); tezWork.add(reduceWork); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); tezWork.connect( context.preceedingWork, - reduceWork, EdgeType.SIMPLE_EDGE); + reduceWork, edgeProp); context.connectedReduceSinks.add(reduceSink); return reduceWork; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Fri Mar 28 06:11:07 2014 @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Stack; import org.apache.commons.logging.Log; @@ -45,9 +46,10 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; -import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; /** * GenTezWork separates the operator tree into tez tasks. @@ -160,30 +162,34 @@ public class GenTezWork implements NodeP * RS following the TS, we have already generated work for the TS-RS. * We need to hook the current work to this generated work. */ - List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(mj); - if (linkWorkList != null) { - if (context.linkChildOpWithDummyOp.containsKey(mj)) { - for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) { - work.addDummyOp((HashTableDummyOperator) dummy); + if (context.linkOpWithWorkMap.containsKey(mj)) { + Map<BaseWork,TezEdgeProperty> linkWorkMap = context.linkOpWithWorkMap.get(mj); + if (linkWorkMap != null) { + if (context.linkChildOpWithDummyOp.containsKey(mj)) { + for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) { + work.addDummyOp((HashTableDummyOperator) dummy); + } } - } - for (BaseWork parentWork : linkWorkList) { - LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); - tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE); - - // need to set up output name for reduce sink now that we know the name - // of the downstream work - for (ReduceSinkOperator r: - context.linkWorkWithReduceSinkMap.get(parentWork)) { - if (r.getConf().getOutputName() != null) { - LOG.debug("Cloning reduce sink for multi-child broadcast edge"); - // we've already set this one up. Need to clone for the next work. - r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( - (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators()); - context.clonedReduceSinks.add(r); + for (Entry<BaseWork,TezEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) { + BaseWork parentWork = parentWorkMap.getKey(); + LOG.debug("connecting "+parentWork.getName()+" with "+work.getName()); + TezEdgeProperty edgeProp = parentWorkMap.getValue(); + tezWork.connect(parentWork, work, edgeProp); + + // need to set up output name for reduce sink now that we know the name + // of the downstream work + for (ReduceSinkOperator r: + context.linkWorkWithReduceSinkMap.get(parentWork)) { + if (r.getConf().getOutputName() != null) { + LOG.debug("Cloning reduce sink for multi-child broadcast edge"); + // we've already set this one up. Need to clone for the next work. + r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( + (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators()); + context.clonedReduceSinks.add(r); + } + r.getConf().setOutputName(work.getName()); + context.connectedReduceSinks.add(r); } - r.getConf().setOutputName(work.getName()); - context.connectedReduceSinks.add(r); } } } @@ -221,7 +227,8 @@ public class GenTezWork implements NodeP // finally hook everything up LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")"); - tezWork.connect(unionWork, work, EdgeType.CONTAINS); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS); + tezWork.connect(unionWork, work, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.currentUnionOperators.clear(); context.workWithUnionOperators.add(work); @@ -261,7 +268,8 @@ public class GenTezWork implements NodeP if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items - tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + tezWork.connect(work, rWork, edgeProp); context.connectedReduceSinks.add(rs); } } else { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Fri Mar 28 06:11:07 2014 @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.W import org.apache.hadoop.hive.ql.lib.CompositeProcessor; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -117,7 +118,7 @@ public class TezCompiler extends TaskCom Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); List<Node> topNodes = new ArrayList<Node>(); topNodes.addAll(pCtx.getTopOps().values()); - GraphWalker ogw = new TezWalker(disp); + GraphWalker ogw = new ForwardWalker(disp); ogw.startWalking(topNodes, null); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java Fri Mar 28 06:11:07 2014 @@ -22,6 +22,7 @@ public class AbstractOperatorDesc implem protected boolean vectorMode = false; protected transient Statistics statistics; + protected transient OpTraits opTraits; @Override @Explain(skipHeader = true, displayName = "Statistics") @@ -42,4 +43,12 @@ public class AbstractOperatorDesc implem public void setVectorMode(boolean vm) { this.vectorMode = vm; } + + public OpTraits getOpTraits() { + return opTraits; + } + + public void setOpTraits(OpTraits opTraits) { + this.opTraits = opTraits; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Fri Mar 28 06:11:07 2014 @@ -49,6 +49,9 @@ public class MapJoinDesc extends JoinDes // for tez. used to remember which position maps to which logical input private Map<Integer, String> parentToInput = new HashMap<Integer, String>(); + + // for tez. used to remember which type of a Bucket Map Join this is. + private boolean customBucketMapJoin; // table alias (small) --> input file name (big) --> target file names (small) private Map<String, Map<String, List<String>>> aliasBucketFileNameMapping; @@ -81,6 +84,7 @@ public class MapJoinDesc extends JoinDes this.bigTablePartSpecToFileMapping = clone.bigTablePartSpecToFileMapping; this.dumpFilePrefix = clone.dumpFilePrefix; this.parentToInput = clone.parentToInput; + this.customBucketMapJoin = clone.customBucketMapJoin; } public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys, @@ -280,4 +284,12 @@ public class MapJoinDesc extends JoinDes public float getHashTableMemoryUsage() { return hashtableMemoryUsage; } + + public void setCustomBucketMapJoin(boolean customBucketMapJoin) { + this.customBucketMapJoin = customBucketMapJoin; + } + + public boolean getCustomBucketMapJoin() { + return this.customBucketMapJoin; + } } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java Fri Mar 28 06:11:07 2014 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; + +public class OpTraits { + + List<List<String>> bucketColNames; + int numBuckets; + + public OpTraits(List<List<String>> bucketColNames, int numBuckets) { + this.bucketColNames = bucketColNames; + this.numBuckets = numBuckets; + } + + public List<List<String>> getBucketColNames() { + return bucketColNames; + } + + public int getNumBuckets() { + return numBuckets; + } + + public void setBucketColNames(List<List<String>> bucketColNames) { + this.bucketColNames = bucketColNames; + } + + public void setNumBuckets(int numBuckets) { + this.numBuckets = numBuckets; + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java Fri Mar 28 06:11:07 2014 @@ -24,4 +24,6 @@ public interface OperatorDesc extends Se public Object clone() throws CloneNotSupportedException; public Statistics getStatistics(); public void setStatistics(Statistics statistics); + public OpTraits getOpTraits(); + public void setOpTraits(OpTraits opTraits); } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java Fri Mar 28 06:11:07 2014 @@ -0,0 +1,45 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; + +public class TezEdgeProperty { + + public enum EdgeType { + SIMPLE_EDGE, + BROADCAST_EDGE, + CONTAINS, + CUSTOM_EDGE, + CUSTOM_SIMPLE_EDGE, + } + + private HiveConf hiveConf; + private EdgeType edgeType; + private int numBuckets; + + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, + int buckets) { + this.hiveConf = hiveConf; + this.edgeType = edgeType; + this.numBuckets = buckets; + } + + public TezEdgeProperty(EdgeType edgeType) { + this(null, edgeType, -1); + } + + public EdgeType getEdgeType() { + return edgeType; + } + + public HiveConf getHiveConf () { + return hiveConf; + } + + public int getNumBuckets() { + return numBuckets; + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Fri Mar 28 06:11:07 2014 @@ -32,6 +32,8 @@ import org.apache.commons.lang3.tuple.Im import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.tez.dag.api.EdgeProperty; /** * TezWork. This class encapsulates all the work objects that can be executed @@ -43,12 +45,6 @@ import org.apache.commons.logging.LogFac @Explain(displayName = "Tez") public class TezWork extends AbstractOperatorDesc { - public enum EdgeType { - SIMPLE_EDGE, - BROADCAST_EDGE, - CONTAINS - } - private static transient final Log LOG = LogFactory.getLog(TezWork.class); private static int counter; @@ -57,8 +53,8 @@ public class TezWork extends AbstractOpe private final Set<BaseWork> leaves = new HashSet<BaseWork>(); private final Map<BaseWork, List<BaseWork>> workGraph = new HashMap<BaseWork, List<BaseWork>>(); private final Map<BaseWork, List<BaseWork>> invertedWorkGraph = new HashMap<BaseWork, List<BaseWork>>(); - private final Map<Pair<BaseWork, BaseWork>, EdgeType> edgeProperties = - new HashMap<Pair<BaseWork, BaseWork>, EdgeType>(); + private final Map<Pair<BaseWork, BaseWork>, TezEdgeProperty> edgeProperties = + new HashMap<Pair<BaseWork, BaseWork>, TezEdgeProperty>(); public TezWork(String name) { this.name = name + ":" + (++counter); @@ -147,19 +143,6 @@ public class TezWork extends AbstractOpe } /** - * connect adds an edge between a and b. Both nodes have - * to be added prior to calling connect. - */ - public void connect(BaseWork a, BaseWork b, EdgeType edgeType) { - workGraph.get(a).add(b); - invertedWorkGraph.get(b).add(a); - roots.remove(b); - leaves.remove(a); - ImmutablePair workPair = new ImmutablePair(a, b); - edgeProperties.put(workPair, edgeType); - } - - /** * disconnect removes an edge between a and b. Both a and * b have to be in the graph. If there is no matching edge * no change happens. @@ -242,10 +225,14 @@ public class TezWork extends AbstractOpe invertedWorkGraph.remove(work); } + public EdgeType getEdgeType(BaseWork a, BaseWork b) { + return edgeProperties.get(new ImmutablePair(a,b)).getEdgeType(); + } + /** * returns the edge type connecting work a and b */ - public EdgeType getEdgeProperty(BaseWork a, BaseWork b) { + public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { return edgeProperties.get(new ImmutablePair(a,b)); } @@ -275,7 +262,7 @@ public class TezWork extends AbstractOpe for (BaseWork d: entry.getValue()) { Dependency dependency = new Dependency(); dependency.w = d; - dependency.type = getEdgeProperty(d, entry.getKey()); + dependency.type = getEdgeType(d, entry.getKey()); dependencies.add(dependency); } if (!dependencies.isEmpty()) { @@ -284,4 +271,19 @@ public class TezWork extends AbstractOpe } return result; } + + /** + * connect adds an edge between a and b. Both nodes have + * to be added prior to calling connect. + * @param + */ + public void connect(BaseWork a, BaseWork b, + TezEdgeProperty edgeProp) { + workGraph.get(a).add(b); + invertedWorkGraph.get(b).add(a); + roots.remove(b); + leaves.remove(a); + ImmutablePair workPair = new ImmutablePair(a, b); + edgeProperties.put(workPair, edgeProp); + } } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Fri Mar 28 06:11:07 2014 @@ -47,8 +47,9 @@ import org.apache.hadoop.hive.ql.plan.Ba import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -92,7 +93,7 @@ public class TestTezTask { when(path.getFileSystem(any(Configuration.class))).thenReturn(fs); when(utils.getTezDir(any(Path.class))).thenReturn(path); when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class), - any(List.class), any(FileSystem.class), any(Context.class), anyBoolean())).thenAnswer(new Answer<Vertex>() { + any(List.class), any(FileSystem.class), any(Context.class), anyBoolean(), any(TezWork.class))).thenAnswer(new Answer<Vertex>() { @Override public Vertex answer(InvocationOnMock invocation) throws Throwable { @@ -103,7 +104,7 @@ public class TestTezTask { }); when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class), - any(Vertex.class), any(EdgeType.class))).thenAnswer(new Answer<Edge>() { + any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer<Edge>() { @Override public Edge answer(InvocationOnMock invocation) throws Throwable { @@ -145,9 +146,10 @@ public class TestTezTask { rws[0].setReducer(op); rws[1].setReducer(op); - work.connect(mws[0], rws[0], EdgeType.SIMPLE_EDGE); - work.connect(mws[1], rws[0], EdgeType.SIMPLE_EDGE); - work.connect(rws[0], rws[1], EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + work.connect(mws[0], rws[0], edgeProp); + work.connect(mws[1], rws[0], edgeProp); + work.connect(rws[0], rws[1], edgeProp); task = new TezTask(utils); task.setWork(work); Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java?rev=1582623&r1=1582622&r2=1582623&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java Fri Mar 28 06:11:07 2014 @@ -22,7 +22,7 @@ import java.util.List; import junit.framework.Assert; -import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.junit.Before; import org.junit.Test; @@ -62,7 +62,8 @@ public class TestTezWork { BaseWork parent = nodes.get(0); BaseWork child = nodes.get(1); - work.connect(parent, child, EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + work.connect(parent, child, edgeProp); Assert.assertEquals(work.getParents(child).size(), 1); Assert.assertEquals(work.getChildren(parent).size(), 1); @@ -78,7 +79,7 @@ public class TestTezWork { Assert.assertEquals(work.getChildren(w).size(), 0); } - Assert.assertEquals(work.getEdgeProperty(parent, child), EdgeType.SIMPLE_EDGE); + Assert.assertEquals(work.getEdgeProperty(parent, child).getEdgeType(), EdgeType.SIMPLE_EDGE); } @Test @@ -86,7 +87,8 @@ public class TestTezWork { BaseWork parent = nodes.get(0); BaseWork child = nodes.get(1); - work.connect(parent, child, EdgeType.BROADCAST_EDGE); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.BROADCAST_EDGE); + work.connect(parent, child, edgeProp); Assert.assertEquals(work.getParents(child).size(), 1); Assert.assertEquals(work.getChildren(parent).size(), 1); @@ -102,7 +104,7 @@ public class TestTezWork { Assert.assertEquals(work.getChildren(w).size(), 0); } - Assert.assertEquals(work.getEdgeProperty(parent, child), EdgeType.BROADCAST_EDGE); + Assert.assertEquals(work.getEdgeProperty(parent, child).getEdgeType(), EdgeType.BROADCAST_EDGE); } @Test @@ -110,8 +112,9 @@ public class TestTezWork { BaseWork parent = nodes.get(0); BaseWork children[] = {nodes.get(1), nodes.get(2)}; - work.connect(parent, children[0], EdgeType.SIMPLE_EDGE); - work.connect(parent, children[1], EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + work.connect(parent, children[0], edgeProp); + work.connect(parent, children[1], edgeProp); work.disconnect(parent, children[0]); @@ -128,8 +131,9 @@ public class TestTezWork { BaseWork parent = nodes.get(0); BaseWork children[] = {nodes.get(1), nodes.get(2)}; - work.connect(parent, children[0], EdgeType.SIMPLE_EDGE); - work.connect(parent, children[1], EdgeType.SIMPLE_EDGE); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + work.connect(parent, children[0], edgeProp); + work.connect(parent, children[1], edgeProp); work.remove(parent); @@ -142,8 +146,9 @@ public class TestTezWork { @Test public void testGetAllWork() throws Exception { + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); for (int i = 4; i > 0; --i) { - work.connect(nodes.get(i), nodes.get(i-1), EdgeType.SIMPLE_EDGE); + work.connect(nodes.get(i), nodes.get(i-1), edgeProp); } List<BaseWork> sorted = work.getAllWork(); Added: hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q Fri Mar 28 06:11:07 2014 @@ -0,0 +1,85 @@ +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10000; + +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +set hive.enforce.bucketing=true; +set hive.enforce.sorting = true; +set hive.optimize.bucketingsorting=false; +insert overwrite table tab_part partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key; + +-- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table. +-- In this case the sub-query is chosen as the big table. +explain +select a.k1, a.v1, b.value +from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a +join tab b on a.k1 = b.key; + +explain +select a.k1, a.v1, b.value +from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab on tab_part.key = tab.key GROUP BY tab.key) a +join tab b on a.k1 = b.key; + +explain +select a.k1, a.v1, b.value +from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y on x.key = y.key GROUP BY x.key) a +join tab_part b on a.k1 = b.key; + +-- multi-way join +explain +select a.key, a.value, b.value +from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key; + +explain +select a.key, a.value, c.value +from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a join tab c on a.key = c.key; + +-- in this case sub-query is the small table +explain +select a.key, a.value, b.value +from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a +join tab_part b on a.key = b.key; + +set hive.map.aggr=false; +explain +select a.key, a.value, b.value +from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a +join tab_part b on a.key = b.key; + +-- join on non-bucketed column results in broadcast join. +explain +select a.key, a.value, b.value +from tab a join tab_part b on a.value = b.value; + +CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab1 +select key,value from srcbucket_mapjoin; + +explain +select a.key, a.value, b.value +from tab1 a join tab_part b on a.key = b.key; + +explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; + + Added: hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q?rev=1582623&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q Fri Mar 28 06:11:07 2014 @@ -0,0 +1,50 @@ +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=10000; + +CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08'); + +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +set hive.enforce.bucketing=true; +set hive.enforce.sorting = true; +set hive.optimize.bucketingsorting=false; +insert overwrite table tab_part partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; + +CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin; + +set hive.convert.join.bucket.mapjoin.tez = true; + +explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; + +CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab1 +select key,value from srcbucket_mapjoin; + +explain +select a.key, a.value, b.value +from tab1 a join src b on a.key = b.key; + +explain +select a.key, b.key from (select key from tab_part where key > 1) a join (select key from tab_part where key > 2) b on a.key = b.key; + +explain +select a.key, b.key from (select key from tab_part where key > 1) a left outer join (select key from tab_part where key > 2) b on a.key = b.key; + +explain +select a.key, b.key from (select key from tab_part where key > 1) a right outer join (select key from tab_part where key > 2) b on a.key = b.key; + +explain select a.key, b.key from (select distinct key from tab) a join tab b on b.key = a.key; + +explain select a.value, b.value from (select distinct value from tab) a join tab b on b.key = a.value;