HIVE-18350 : load data should rename files consistent with insert statements. (Deepak Jaiswal, reviewed by Sergey Shelukhin and Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6e9b63e4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6e9b63e4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6e9b63e4 Branch: refs/heads/master Commit: 6e9b63e48b4f34ba26a6eefb354b0c94ee82256c Parents: 1faadb0 Author: Deepak Jaiswal <djais...@apache.org> Authored: Thu Feb 8 00:46:28 2018 -0800 Committer: Deepak Jaiswal <djais...@apache.org> Committed: Thu Feb 8 00:48:20 2018 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/tez/CustomPartitionVertex.java | 71 +- .../ql/exec/tez/CustomVertexConfiguration.java | 31 +- .../hadoop/hive/ql/exec/tez/DagUtils.java | 16 +- .../apache/hadoop/hive/ql/metadata/Table.java | 16 +- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 54 +- .../annotation/OpTraitsRulesProcFactory.java | 35 +- .../optimizer/spark/SparkMapJoinOptimizer.java | 3 +- .../hive/ql/parse/LoadSemanticAnalyzer.java | 43 + .../apache/hadoop/hive/ql/plan/OpTraits.java | 22 +- .../hadoop/hive/ql/metadata/TestHive.java | 9 +- .../clientpositive/auto_sortmerge_join_2.q | 12 +- .../clientpositive/auto_sortmerge_join_4.q | 6 +- .../clientpositive/auto_sortmerge_join_5.q | 10 +- .../clientpositive/auto_sortmerge_join_7.q | 8 +- .../bucket_mapjoin_mismatch1.q.out | 170 - .../clientpositive/auto_sortmerge_join_2.q.out | 196 +- .../clientpositive/auto_sortmerge_join_4.q.out | 104 +- .../clientpositive/auto_sortmerge_join_5.q.out | 158 +- .../clientpositive/auto_sortmerge_join_7.q.out | 168 +- .../llap/auto_sortmerge_join_2.q.out | 132 +- .../llap/auto_sortmerge_join_4.q.out | 74 +- .../llap/auto_sortmerge_join_5.q.out | 112 +- .../llap/auto_sortmerge_join_7.q.out | 114 +- .../spark/auto_sortmerge_join_2.q.out | 104 +- .../spark/auto_sortmerge_join_4.q.out | 74 +- .../spark/auto_sortmerge_join_5.q.out | 118 +- .../spark/auto_sortmerge_join_7.q.out | 114 +- .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2240 +++++----- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3888 +++++++++--------- .../gen/thrift/gen-cpp/hive_metastore_types.h | 32 +- .../hive/metastore/api/BucketingVersion.java | 48 + .../apache/hadoop/hive/metastore/api/Table.java | 230 +- .../src/gen/thrift/gen-php/metastore/Types.php | 57 + .../gen/thrift/gen-py/hive_metastore/ttypes.py | 45 +- .../gen/thrift/gen-rb/hive_metastore_types.rb | 17 +- .../hadoop/hive/metastore/ObjectStore.java | 5 +- .../hadoop/hive/metastore/model/MTable.java | 37 +- .../src/main/thrift/hive_metastore.thrift | 11 +- .../hive/metastore/cache/TestCachedStore.java | 18 +- .../TestTablesCreateDropAlterTruncate.java | 19 +- 40 files changed, 4676 insertions(+), 3945 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 26afe90..ef148d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -108,14 +108,15 @@ public class CustomPartitionVertex extends VertexManagerPlugin { private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create(); private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap = - new HashMap<String, Multimap<Integer, InputSplit>>(); + new HashMap<>(); private int numInputsAffectingRootInputSpecUpdate = 1; private int numInputsSeenSoFar = 0; private final Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap(); private final List<InputSplit> finalSplits = Lists.newLinkedList(); private final Map<String, InputSpecUpdate> inputNameInputSpecMap = - new HashMap<String, InputSpecUpdate>(); + new HashMap<>(); + private Map<String, Integer> inputToBucketMap; public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -137,6 +138,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin { this.mainWorkName = vertexConf.getInputName(); this.vertexType = vertexConf.getVertexType(); this.numInputsAffectingRootInputSpecUpdate = vertexConf.getNumInputs(); + this.inputToBucketMap = vertexConf.getInputToBucketMap(); } @Override @@ -242,7 +244,7 @@ public class CustomPartitionVertex extends VertexManagerPlugin { } Multimap<Integer, InputSplit> bucketToInitialSplitMap = - getBucketSplitMapForPath(pathFileSplitsMap); + getBucketSplitMapForPath(inputName, pathFileSplitsMap); try { int totalResource = context.getTotalAvailableResource().getMemory(); @@ -532,20 +534,47 @@ public class CustomPartitionVertex extends VertexManagerPlugin { /* * This method generates the map of bucket to file splits. */ - private Multimap<Integer, InputSplit> getBucketSplitMapForPath( + private Multimap<Integer, InputSplit> getBucketSplitMapForPath(String inputName, Map<String, Set<FileSplit>> pathFileSplitsMap) { - int bucketNum = 0; Multimap<Integer, InputSplit> bucketToInitialSplitMap = - ArrayListMultimap.<Integer, InputSplit> create(); + ArrayListMultimap.create(); + boolean fallback = false; + List<Integer> bucketIds = new ArrayList<>(); for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) { - int bucketId = bucketNum % numBuckets; + // Extract the buckedID from pathFilesMap, this is more accurate method, + // however. it may not work in certain cases where buckets are named + // after files used while loading data. In such case, fallback to old + // potential inaccurate method. + // The accepted file names are such as 000000_0, 000001_0_copy_1. + String bucketIdStr = + Utilities.getBucketFileNameFromPathSubString(entry.getKey()); + int bucketId = Utilities.getBucketIdFromFile(bucketIdStr); + if (bucketId == -1) { + fallback = true; + LOG.info("Fallback to using older sort based logic to assign " + + "buckets to splits."); + bucketIds.clear(); + break; + } + bucketIds.add(bucketId); for (FileSplit fsplit : entry.getValue()) { bucketToInitialSplitMap.put(bucketId, fsplit); } - bucketNum++; + } + + int bucketNum = 0; + if (fallback) { + // This is the old logic which assumes that the filenames are sorted in + // alphanumeric order and mapped to appropriate bucket number. + for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) { + for (FileSplit fsplit : entry.getValue()) { + bucketToInitialSplitMap.put(bucketNum, fsplit); + } + bucketNum++; + } } // this is just for SMB join use-case. The numBuckets would be equal to that of the big table @@ -553,16 +582,28 @@ public class CustomPartitionVertex extends VertexManagerPlugin { // data from the right buckets to the big table side. For e.g. Big table has 8 buckets and small // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 4 of the big table as // well. - if (bucketNum < numBuckets) { - int loopedBucketId = 0; - for (; bucketNum < numBuckets; bucketNum++) { - for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) { - bucketToInitialSplitMap.put(bucketNum, fsplit); + if (numInputsAffectingRootInputSpecUpdate != 1 && + inputName.compareTo(mainWorkName) != 0) { + // small table + int inputNumBuckets = inputToBucketMap.get(inputName); + if (fallback && bucketNum != inputNumBuckets) { + // The fallback mechanism kicked in which only works correctly if + // there exists a file for each bucket, else it may result in wrong + // result. Throw an error + + } + if (inputNumBuckets < numBuckets) { + // Need to send the splits to multiple buckets + for (int i = 1; i < numBuckets/inputNumBuckets; i++) { + int bucketIdBase = i * inputNumBuckets; + for (Integer bucketId : bucketIds) { + for (InputSplit fsplit : bucketToInitialSplitMap.get(bucketId)) { + bucketToInitialSplitMap.put(bucketIdBase + bucketId, fsplit); + } + } } - loopedBucketId++; } } - return bucketToInitialSplitMap; } } http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java index ef5e7ed..4301829 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java @@ -21,7 +21,10 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.io.Writable; @@ -39,22 +42,24 @@ public class CustomVertexConfiguration implements Writable { private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; private int numInputs; private String inputName; + private Map<String, Integer> inputToBucketMap; public CustomVertexConfiguration() { } // this is the constructor to use for the Bucket map join case. public CustomVertexConfiguration(int numBuckets, VertexType vertexType) { - this(numBuckets, vertexType, "", 1); + this(numBuckets, vertexType, "", 1, null); } // this is the constructor to use for SMB. public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName, - int numInputs) { + int numInputs, Map<String, Integer> inputToBucketMap) { this.numBuckets = numBuckets; this.vertexType = vertexType; this.numInputs = numInputs; this.inputName = inputName; + this.inputToBucketMap = inputToBucketMap; } @Override @@ -63,6 +68,14 @@ public class CustomVertexConfiguration implements Writable { out.writeInt(this.numBuckets); out.writeInt(numInputs); out.writeUTF(inputName); + int sz = inputToBucketMap != null ? inputToBucketMap.size() : 0; + out.writeInt(sz); + if (sz > 0) { + for (Map.Entry<String, Integer> entry : inputToBucketMap.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeInt(entry.getValue()); + } + } } @Override @@ -71,6 +84,16 @@ public class CustomVertexConfiguration implements Writable { this.numBuckets = in.readInt(); this.numInputs = in.readInt(); this.inputName = in.readUTF(); + int sz = in.readInt(); + Preconditions.checkState(sz >= 0); + if (sz == 0) { + this.inputToBucketMap = null; + } else { + this.inputToBucketMap = new HashMap<>(); + for (int i = 0; i < sz; i++) { + this.inputToBucketMap.put(in.readUTF(), in.readInt()); + } + } } public int getNumBuckets() { @@ -88,4 +111,8 @@ public class CustomVertexConfiguration implements Writable { public int getNumInputs() { return numInputs; } + + public Map<String, Integer> getInputToBucketMap() { + return inputToBucketMap; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 9885038..0e75f6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import javax.security.auth.login.LoginException; @@ -568,13 +569,26 @@ public class DagUtils { MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build()); } + // To be populated for SMB joins only for all the small tables + Map<String, Integer> inputToBucketMap = new HashMap<>(); + if (mergeJoinWork.getMergeJoinOperator().getParentOperators().size() == 1 + && mergeJoinWork.getMergeJoinOperator().getOpTraits() != null) { + // This is an SMB join. + for (BaseWork work : mapWorkList) { + MapWork mw = (MapWork) work; + Map<String, Operator<?>> aliasToWork = mw.getAliasToWork(); + Preconditions.checkState(aliasToWork.size() == 1, + "More than 1 alias in SMB mapwork"); + inputToBucketMap.put(mw.getName(), mw.getWorks().get(0).getOpTraits().getNumBuckets()); + } + } VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); // the +1 to the size is because of the main work. CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(), - mapWorkList.size() + 1); + mapWorkList.size() + 1, inputToBucketMap); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); byte[] userPayload = dob.getData(); http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 632a213..4ee0579 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -42,15 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; -import org.apache.hadoop.hive.metastore.api.CreationMetadata; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -188,6 +180,8 @@ public class Table implements Serializable { t.setOwner(SessionState.getUserFromAuthenticator()); // set create time t.setCreateTime((int) (System.currentTimeMillis() / 1000)); + t.setBucketingVersion(BucketingVersion.MURMUR_BUCKETING); + t.setLoadInBucketedTable(false); } return t; } @@ -676,6 +670,10 @@ public class Table implements Serializable { return tTable.getSd().getNumBuckets(); } + public BucketingVersion getBucketingVersion() { + return tTable.getBucketingVersion(); + } + public void setInputFormatClass(String name) throws HiveException { if (name == null) { inputFormatClass = null; http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index dc698c8..f30cf4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -26,9 +26,11 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.BucketingVersion; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; @@ -180,7 +182,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks())); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null, + joinOp.getOpTraits().getNumReduceSinks(), joinOp.getOpTraits().getBucketingVersion())); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { @@ -378,7 +381,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { joinOp.getSchema()); int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, - joinOp.getOpTraits().getSortCols(), numReduceSinks); + joinOp.getOpTraits().getSortCols(), numReduceSinks, joinOp.getOpTraits().getBucketingVersion()); mergeJoinOp.setOpTraits(opTraits); mergeJoinOp.setStatistics(joinOp.getStatistics()); @@ -445,7 +448,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { return; } currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), - opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks())); + opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks(), + opTraits.getBucketingVersion())); for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -498,7 +502,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { // we can set the traits for this join operator opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks()); + tezBucketJoinProcCtx.getNumBuckets(), null, + joinOp.getOpTraits().getNumReduceSinks(), joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); @@ -576,6 +581,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { return false; } ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; + // If the chosen big table has less number of buckets than any of the + // small tables, then those buckets will have no mapping to any of the + // big table buckets resulting in wrong results. + if (numBuckets > 0 && numBuckets < rsOp.getOpTraits().getNumBuckets()) { + LOG.info("Small table has more buckets than big table."); + return false; + } if (!checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), false)) { LOG.info("We cannot convert to SMB because the sort column names do not match."); @@ -593,6 +605,37 @@ public class ConvertJoinMapJoin implements NodeProcessor { numBuckets = bigTableRS.getConf().getNumReducers(); } tezBucketJoinProcCtx.setNumBuckets(numBuckets); + + // With bucketing using two different versions. Version 1 for exiting + // tables and version 2 for new tables. All the inputs to the SMB must be + // from same version. This only applies to tables read directly and not + // intermediate outputs of joins/groupbys + BucketingVersion version = BucketingVersion.INVALID_BUCKETING; + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + // Check if the parent is coming from a table scan, if so, what is the version of it. + assert parentOp.getParentOperators() != null && parentOp.getParentOperators().size() == 1; + Operator<?> op = parentOp.getParentOperators().get(0); + while(op != null && !(op instanceof TableScanOperator + || op instanceof ReduceSinkOperator + || op instanceof CommonJoinOperator)) { + // If op has parents it is guaranteed to be 1. + List<Operator<?>> parents = op.getParentOperators(); + Preconditions.checkState(parents.size() == 0 || parents.size() == 1); + op = parents.size() == 1 ? parents.get(0) : null; + } + + if (op instanceof TableScanOperator) { + BucketingVersion localVersion = ((TableScanOperator)op).getConf(). + getTableMetadata().getBucketingVersion(); + if (version == BucketingVersion.INVALID_BUCKETING) { + version = localVersion; + } else if (version != localVersion) { + // versions dont match, return false. + LOG.debug("SMB Join can't be performed due to bucketing version mismatch"); + return false; + } + } + } LOG.info("We can convert the join to an SMB join."); return true; } @@ -1168,7 +1211,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { joinOp.getOpTraits().getBucketColNames(), numReducers, null, - joinOp.getOpTraits().getNumReduceSinks()); + joinOp.getOpTraits().getNumReduceSinks(), + joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 69d9f31..7696402 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -22,6 +22,7 @@ import java.util.*; import java.util.Map.Entry; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.BucketingVersion; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -92,10 +93,12 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); int numBuckets = -1; int numReduceSinks = 1; + BucketingVersion bucketingVersion = BucketingVersion.INVALID_BUCKETING; OpTraits parentOpTraits = rs.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); numReduceSinks += parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } List<String> bucketCols = new ArrayList<>(); @@ -134,7 +137,8 @@ public class OpTraitsRulesProcFactory { } listBucketCols.add(bucketCols); - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, + listBucketCols, numReduceSinks, bucketingVersion); rs.setOpTraits(opTraits); return null; } @@ -213,7 +217,8 @@ public class OpTraitsRulesProcFactory { sortedColsList.add(sortCols); } // num reduce sinks hardcoded to 0 because TS has no parents - OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0); + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, + sortedColsList, 0, table.getBucketingVersion()); ts.setOpTraits(opTraits); return null; } @@ -239,12 +244,15 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); int numReduceSinks = 0; + BucketingVersion bucketingVersion = BucketingVersion.INVALID_BUCKETING; OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numReduceSinks = parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, + numReduceSinks, bucketingVersion); gbyOp.setOpTraits(opTraits); return null; } @@ -298,12 +306,15 @@ public class OpTraitsRulesProcFactory { int numBuckets = -1; int numReduceSinks = 0; + BucketingVersion bucketingVersion = BucketingVersion.INVALID_BUCKETING; OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); numReduceSinks = parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, + numReduceSinks, bucketingVersion); selOp.setOpTraits(opTraits); return null; } @@ -319,6 +330,7 @@ public class OpTraitsRulesProcFactory { List<List<String>> sortColsList = new ArrayList<List<String>>(); byte pos = 0; int numReduceSinks = 0; // will be set to the larger of the parents + boolean bucketingVersionSeen = false; for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator @@ -338,7 +350,7 @@ public class OpTraitsRulesProcFactory { pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks, BucketingVersion.INVALID_BUCKETING)); return null; } @@ -392,6 +404,8 @@ public class OpTraitsRulesProcFactory { Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd; int numReduceSinks = 0; + BucketingVersion bucketingVersion = BucketingVersion.INVALID_BUCKETING; + boolean bucketingVersionSeen = false; for (Operator<?> parentOp : operator.getParentOperators()) { if (parentOp.getOpTraits() == null) { continue; @@ -399,8 +413,17 @@ public class OpTraitsRulesProcFactory { if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) { numReduceSinks = parentOp.getOpTraits().getNumReduceSinks(); } + // If there is mismatch in bucketingVersion, then it should be set to + // -1, that way SMB will be disabled. + if (bucketingVersion == BucketingVersion.INVALID_BUCKETING && !bucketingVersionSeen) { + bucketingVersion = parentOp.getOpTraits().getBucketingVersion(); + bucketingVersionSeen = true; + } else if (bucketingVersion != parentOp.getOpTraits().getBucketingVersion()) { + bucketingVersion = BucketingVersion.INVALID_BUCKETING; + } } - OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks); + OpTraits opTraits = new OpTraits(null, -1, + null, numReduceSinks, bucketingVersion); operator.setOpTraits(opTraits); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index bacc444..39d2370 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -121,7 +121,8 @@ public class SparkMapJoinOptimizer implements NodeProcessor { } // we can set the traits for this join operator - OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks()); + OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, + joinOp.getOpTraits().getNumReduceSinks(), joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 54f5bab..3619763 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -160,6 +161,48 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { "source contains directory: " + oneSrc.getPath().toString())); } } + // Do another loop if table is bucketed + List<String> bucketCols = table.getBucketCols(); + if (bucketCols != null && !bucketCols.isEmpty()) { + // Hive assumes that user names the files as per the corresponding + // bucket. For e.g, file names should follow the format 000000_0, 000000_1 etc. + // Here the 1st file will belong to bucket 0 and 2nd to bucket 1 and so on. + boolean[] bucketArray = new boolean[table.getNumBuckets()]; + // initialize the array + Arrays.fill(bucketArray, false); + int numBuckets = table.getNumBuckets(); + + for (FileStatus oneSrc : srcs) { + String bucketName = oneSrc.getPath().getName(); + + //get the bucket id + String bucketIdStr = + Utilities.getBucketFileNameFromPathSubString(bucketName); + int bucketId = Utilities.getBucketIdFromFile(bucketIdStr); + LOG.debug("bucket ID for file " + oneSrc.getPath() + " = " + bucketId + + " for table " + table.getFullyQualifiedName()); + if (bucketId == -1) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg( + "The file name is invalid : " + + oneSrc.getPath().toString() + " for table " + + table.getFullyQualifiedName())); + } + if (bucketId >= numBuckets) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg( + "The file name corresponds to invalid bucketId : " + + oneSrc.getPath().toString()) + + ". Maximum number of buckets can be " + numBuckets + + " for table " + table.getFullyQualifiedName()); + } + if (bucketArray[bucketId]) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg( + "Multiple files for same bucket : " + bucketId + + ". Only 1 file per bucket allowed in single load command. To load multiple files for same bucket, use multiple statements for table " + + table.getFullyQualifiedName())); + } + bucketArray[bucketId] = true; + } + } } catch (IOException e) { // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java index 9621c3b..0dcc229 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java @@ -18,21 +18,26 @@ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.metastore.api.BucketingVersion; + import java.util.List; public class OpTraits { - List<List<String>> bucketColNames; - List<List<String>> sortColNames; - int numBuckets; - int numReduceSinks; + private List<List<String>> bucketColNames; + private List<List<String>> sortColNames; + private int numBuckets; + private int numReduceSinks; + private BucketingVersion bucketingVersion; public OpTraits(List<List<String>> bucketColNames, int numBuckets, - List<List<String>> sortColNames, int numReduceSinks) { + List<List<String>> sortColNames, int numReduceSinks, + BucketingVersion bucketingVersion) { this.bucketColNames = bucketColNames; this.numBuckets = numBuckets; this.sortColNames = sortColNames; this.numReduceSinks = numReduceSinks; + this.bucketingVersion = bucketingVersion; } public List<List<String>> getBucketColNames() { @@ -68,6 +73,13 @@ public class OpTraits { return this.numReduceSinks; } + public void setBucketingVersion(BucketingVersion bucketingVersion) { + this.bucketingVersion = bucketingVersion; + } + + public BucketingVersion getBucketingVersion() { + return bucketingVersion; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java index b5b478f..5355e06 100755 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java @@ -35,12 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.PartitionDropOptions; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.index.HiveIndex; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.session.SessionState; @@ -171,6 +166,8 @@ public class TestHive extends TestCase { tbl.setStoredAsSubDirectories(false); tbl.setRewriteEnabled(false); + tbl.getTTable().setBucketingVersion(BucketingVersion.MURMUR_BUCKETING); + tbl.getTTable().setLoadInBucketedTable(false); // create table setNullCreateTableGrants(); http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/queries/clientpositive/auto_sortmerge_join_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/auto_sortmerge_join_2.q b/ql/src/test/queries/clientpositive/auto_sortmerge_join_2.q index e5fdcb5..b7bd10e 100644 --- a/ql/src/test/queries/clientpositive/auto_sortmerge_join_2.q +++ b/ql/src/test/queries/clientpositive/auto_sortmerge_join_2.q @@ -1,19 +1,21 @@ set hive.strict.checks.bucketing=false; set hive.mapred.mode=nonstrict; --- small 1 part, 4 bucket & big 2 part, 2 bucket -CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +-- small 1 part, 2 bucket & big 2 part, 4 bucket +CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/small/000000_0' INTO TABLE bucket_small partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/small/000001_0' INTO TABLE bucket_small partition(ds='2008-04-08'); -load data local inpath '../../data/files/auto_sortmerge_join/small/000002_0' INTO TABLE bucket_small partition(ds='2008-04-08'); -load data local inpath '../../data/files/auto_sortmerge_join/small/000003_0' INTO TABLE bucket_small partition(ds='2008-04-08'); -CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/big/000001_0' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big partition(ds='2008-04-09'); load data local inpath '../../data/files/auto_sortmerge_join/big/000001_0' INTO TABLE bucket_big partition(ds='2008-04-09'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-09'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-09'); set hive.auto.convert.join=true; set hive.auto.convert.sortmerge.join=true; http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/queries/clientpositive/auto_sortmerge_join_4.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/auto_sortmerge_join_4.q b/ql/src/test/queries/clientpositive/auto_sortmerge_join_4.q index abf09e5..9f719ae 100644 --- a/ql/src/test/queries/clientpositive/auto_sortmerge_join_4.q +++ b/ql/src/test/queries/clientpositive/auto_sortmerge_join_4.q @@ -1,7 +1,7 @@ set hive.strict.checks.bucketing=false; set hive.mapred.mode=nonstrict; --- small 2 part, 4 bucket & big 1 part, 2 bucket +-- small 2 part, 4 bucket & big 1 part, 4 bucket CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/small/000000_0' INTO TABLE bucket_small partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/small/000001_0' INTO TABLE bucket_small partition(ds='2008-04-08'); @@ -13,9 +13,11 @@ load data local inpath '../../data/files/auto_sortmerge_join/small/000001_0' INT load data local inpath '../../data/files/auto_sortmerge_join/small/000002_0' INTO TABLE bucket_small partition(ds='2008-04-09'); load data local inpath '../../data/files/auto_sortmerge_join/small/000003_0' INTO TABLE bucket_small partition(ds='2008-04-09'); -CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/big/000001_0' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-08'); set hive.auto.convert.join=true; set hive.auto.convert.sortmerge.join=true; http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/queries/clientpositive/auto_sortmerge_join_5.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/auto_sortmerge_join_5.q b/ql/src/test/queries/clientpositive/auto_sortmerge_join_5.q index b85c4a7..c107501 100644 --- a/ql/src/test/queries/clientpositive/auto_sortmerge_join_5.q +++ b/ql/src/test/queries/clientpositive/auto_sortmerge_join_5.q @@ -1,19 +1,19 @@ set hive.strict.checks.bucketing=false; set hive.mapred.mode=nonstrict; --- small no part, 4 bucket & big no part, 2 bucket +-- small no part, 2 bucket & big no part, 4 bucket -- SORT_QUERY_RESULTS -CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/small/000000_0' INTO TABLE bucket_small; load data local inpath '../../data/files/auto_sortmerge_join/small/000001_0' INTO TABLE bucket_small; -load data local inpath '../../data/files/auto_sortmerge_join/small/000002_0' INTO TABLE bucket_small; -load data local inpath '../../data/files/auto_sortmerge_join/small/000003_0' INTO TABLE bucket_small; -CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big; load data local inpath '../../data/files/auto_sortmerge_join/big/000001_0' INTO TABLE bucket_big; +load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big; +load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big; set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin = true; http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/queries/clientpositive/auto_sortmerge_join_7.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/auto_sortmerge_join_7.q b/ql/src/test/queries/clientpositive/auto_sortmerge_join_7.q index bd78086..a5cc04a 100644 --- a/ql/src/test/queries/clientpositive/auto_sortmerge_join_7.q +++ b/ql/src/test/queries/clientpositive/auto_sortmerge_join_7.q @@ -1,7 +1,7 @@ set hive.strict.checks.bucketing=false; set hive.mapred.mode=nonstrict; --- small 2 part, 4 bucket & big 2 part, 2 bucket +-- small 2 part, 4 bucket & big 2 part, 4 bucket CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/small/000000_0' INTO TABLE bucket_small partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/small/000001_0' INTO TABLE bucket_small partition(ds='2008-04-08'); @@ -13,12 +13,16 @@ load data local inpath '../../data/files/auto_sortmerge_join/small/000001_0' INT load data local inpath '../../data/files/auto_sortmerge_join/small/000002_0' INTO TABLE bucket_small partition(ds='2008-04-09'); load data local inpath '../../data/files/auto_sortmerge_join/small/000003_0' INTO TABLE bucket_small partition(ds='2008-04-09'); -CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/big/000001_0' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-08'); load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big partition(ds='2008-04-09'); load data local inpath '../../data/files/auto_sortmerge_join/big/000001_0' INTO TABLE bucket_big partition(ds='2008-04-09'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-09'); +load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-09'); set hive.auto.convert.join=true; set hive.auto.convert.sortmerge.join=true; http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out b/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out index b9c2e6f..37dbbf9 100644 --- a/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out +++ b/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out @@ -53,174 +53,4 @@ POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@srcbucket_mapjoin_part_2 -PREHOOK: query: load data local inpath '../../data/files/bmj/000002_0' - INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') -PREHOOK: type: LOAD -#### A masked pattern was here #### -PREHOOK: Output: default@srcbucket_mapjoin_part_2 -POSTHOOK: query: load data local inpath '../../data/files/bmj/000002_0' - INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') -POSTHOOK: type: LOAD -#### A masked pattern was here #### -POSTHOOK: Output: default@srcbucket_mapjoin_part_2 -POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08 -PREHOOK: query: load data local inpath '../../data/files/bmj/000003_0' - INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') -PREHOOK: type: LOAD -#### A masked pattern was here #### -PREHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08 -POSTHOOK: query: load data local inpath '../../data/files/bmj/000003_0' - INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') -POSTHOOK: type: LOAD #### A masked pattern was here #### -POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08 -PREHOOK: query: explain -select a.key, a.value, b.value -from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b -on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" -PREHOOK: type: QUERY -POSTHOOK: query: explain -select a.key, a.value, b.value -from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b -on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" -POSTHOOK: type: QUERY -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Map Reduce - Map Operator Tree: - TableScan - alias: a - Statistics: Num rows: 108 Data size: 42000 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: key is not null (type: boolean) - Statistics: Num rows: 108 Data size: 42000 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: int), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 108 Data size: 42000 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 108 Data size: 42000 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: string) - TableScan - alias: b - Statistics: Num rows: 78 Data size: 30620 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: key is not null (type: boolean) - Statistics: Num rows: 78 Data size: 30620 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: int), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 78 Data size: 30620 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 78 Data size: 30620 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: string) - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: int) - 1 _col0 (type: int) - outputColumnNames: _col0, _col1, _col4 - Statistics: Num rows: 118 Data size: 46200 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col0 (type: int), _col1 (type: string), _col4 (type: string) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 118 Data size: 46200 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 118 Data size: 46200 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: explain -select /*+mapjoin(b)*/ a.key, a.value, b.value -from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b -on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" -PREHOOK: type: QUERY -POSTHOOK: query: explain -select /*+mapjoin(b)*/ a.key, a.value, b.value -from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b -on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" -POSTHOOK: type: QUERY -STAGE DEPENDENCIES: - Stage-3 is a root stage - Stage-1 depends on stages: Stage-3 - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-3 - Map Reduce Local Work - Alias -> Map Local Tables: - b - Fetch Operator - limit: -1 - Alias -> Map Local Operator Tree: - b - TableScan - alias: b - Statistics: Num rows: 102 Data size: 30620 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: key is not null (type: boolean) - Statistics: Num rows: 102 Data size: 30620 Basic stats: COMPLETE Column stats: NONE - HashTable Sink Operator - keys: - 0 key (type: int) - 1 key (type: int) - - Stage: Stage-1 - Map Reduce - Map Operator Tree: - TableScan - alias: a - Statistics: Num rows: 140 Data size: 42000 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: key is not null (type: boolean) - Statistics: Num rows: 140 Data size: 42000 Basic stats: COMPLETE Column stats: NONE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 key (type: int) - 1 key (type: int) - outputColumnNames: _col0, _col1, _col7 - Statistics: Num rows: 154 Data size: 46200 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col0 (type: int), _col1 (type: string), _col7 (type: string) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 154 Data size: 46200 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 154 Data size: 46200 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Local Work: - Map Reduce Local Work - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -FAILED: SemanticException [Error 10136]: Bucketed mapjoin cannot be performed. This can be due to multiple reasons: . Join columns dont match bucketed columns. . Number of buckets are not a multiple of each other. If you really want to perform the operation, either remove the mapjoin hint from your query or set hive.enforce.bucketmapjoin to false. http://git-wip-us.apache.org/repos/asf/hive/blob/6e9b63e4/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out b/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out index 5cfc35a..dda7211 100644 --- a/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out +++ b/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out @@ -1,8 +1,8 @@ -PREHOOK: query: CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +PREHOOK: query: CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@bucket_small -POSTHOOK: query: CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +POSTHOOK: query: CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@bucket_small @@ -23,27 +23,11 @@ POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/sm POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@bucket_small@ds=2008-04-08 -PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/small/000002_0' INTO TABLE bucket_small partition(ds='2008-04-08') -PREHOOK: type: LOAD -#### A masked pattern was here #### -PREHOOK: Output: default@bucket_small@ds=2008-04-08 -POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/small/000002_0' INTO TABLE bucket_small partition(ds='2008-04-08') -POSTHOOK: type: LOAD -#### A masked pattern was here #### -POSTHOOK: Output: default@bucket_small@ds=2008-04-08 -PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/small/000003_0' INTO TABLE bucket_small partition(ds='2008-04-08') -PREHOOK: type: LOAD -#### A masked pattern was here #### -PREHOOK: Output: default@bucket_small@ds=2008-04-08 -POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/small/000003_0' INTO TABLE bucket_small partition(ds='2008-04-08') -POSTHOOK: type: LOAD -#### A masked pattern was here #### -POSTHOOK: Output: default@bucket_small@ds=2008-04-08 -PREHOOK: query: CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: query: CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@bucket_big -POSTHOOK: query: CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: query: CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@bucket_big @@ -64,6 +48,22 @@ POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/bi POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@bucket_big@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@bucket_big@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@bucket_big@ds=2008-04-08 +PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-08') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@bucket_big@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-08') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@bucket_big@ds=2008-04-08 PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000000_0' INTO TABLE bucket_big partition(ds='2008-04-09') PREHOOK: type: LOAD #### A masked pattern was here #### @@ -81,6 +81,22 @@ POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/bi POSTHOOK: type: LOAD #### A masked pattern was here #### POSTHOOK: Output: default@bucket_big@ds=2008-04-09 +PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-09') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@bucket_big@ds=2008-04-09 +POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000002_0' INTO TABLE bucket_big partition(ds='2008-04-09') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@bucket_big@ds=2008-04-09 +PREHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-09') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@bucket_big@ds=2008-04-09 +POSTHOOK: query: load data local inpath '../../data/files/auto_sortmerge_join/big/000003_0' INTO TABLE bucket_big partition(ds='2008-04-09') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@bucket_big@ds=2008-04-09 PREHOOK: query: explain extended select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key PREHOOK: type: QUERY POSTHOOK: query: explain extended select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key @@ -95,16 +111,16 @@ STAGE PLANS: Map Operator Tree: TableScan alias: a - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: key is not null (type: boolean) - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Sorted Merge Bucket Map Join Operator condition map: Inner Join 0 to 1 @@ -134,7 +150,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -142,7 +158,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -150,7 +166,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -158,7 +174,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -183,7 +199,7 @@ STAGE PLANS: partition values: ds 2008-04-09 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -191,7 +207,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -199,7 +215,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -207,7 +223,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -308,7 +324,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 4 + bucket_count 2 bucket_field_name key column.name.delimiter , columns key,value @@ -316,7 +332,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_small - numFiles 4 + numFiles 2 numRows 0 partition_columns ds partition_columns.types string @@ -324,7 +340,7 @@ STAGE PLANS: serialization.ddl struct bucket_small { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 226 + totalSize 114 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -332,7 +348,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 4 + bucket_count 2 bucket_field_name key column.name.delimiter , columns key,value @@ -353,16 +369,16 @@ STAGE PLANS: $hdt$_1:b TableScan alias: b - Statistics: Num rows: 4 Data size: 2260 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 2 Data size: 1140 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: key is not null (type: boolean) - Statistics: Num rows: 4 Data size: 2260 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 2 Data size: 1140 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 4 Data size: 2260 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 2 Data size: 1140 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 _col0 (type: string) @@ -374,16 +390,16 @@ STAGE PLANS: Map Operator Tree: TableScan alias: a - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: key is not null (type: boolean) - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 @@ -414,7 +430,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -422,7 +438,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -430,7 +446,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -438,7 +454,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -463,7 +479,7 @@ STAGE PLANS: partition values: ds 2008-04-09 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -471,7 +487,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -479,7 +495,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -487,7 +503,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -511,7 +527,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 4 + bucket_count 2 bucket_field_name key column.name.delimiter , columns key,value @@ -519,7 +535,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_small - numFiles 4 + numFiles 2 numRows 0 partition_columns ds partition_columns.types string @@ -527,7 +543,7 @@ STAGE PLANS: serialization.ddl struct bucket_small { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 226 + totalSize 114 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -535,7 +551,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 4 + bucket_count 2 bucket_field_name key column.name.delimiter , columns key,value @@ -597,7 +613,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -605,7 +621,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -613,7 +629,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -621,7 +637,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -645,7 +661,7 @@ STAGE PLANS: partition values: ds 2008-04-09 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -653,7 +669,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -661,7 +677,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -669,7 +685,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -690,16 +706,16 @@ STAGE PLANS: $hdt$_0:a TableScan alias: a - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: key is not null (type: boolean) - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 _col0 (type: string) @@ -711,16 +727,16 @@ STAGE PLANS: Map Operator Tree: TableScan alias: b - Statistics: Num rows: 4 Data size: 2260 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 2 Data size: 1140 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: key is not null (type: boolean) - Statistics: Num rows: 4 Data size: 2260 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 2 Data size: 1140 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 4 Data size: 2260 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 2 Data size: 1140 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 @@ -751,7 +767,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -759,7 +775,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -767,7 +783,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -775,7 +791,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -800,7 +816,7 @@ STAGE PLANS: partition values: ds 2008-04-09 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -808,7 +824,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -816,7 +832,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -824,7 +840,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -848,7 +864,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 4 + bucket_count 2 bucket_field_name key column.name.delimiter , columns key,value @@ -856,7 +872,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_small - numFiles 4 + numFiles 2 numRows 0 partition_columns ds partition_columns.types string @@ -864,7 +880,7 @@ STAGE PLANS: serialization.ddl struct bucket_small { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 226 + totalSize 114 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -872,7 +888,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 4 + bucket_count 2 bucket_field_name key column.name.delimiter , columns key,value @@ -924,16 +940,16 @@ STAGE PLANS: Map Operator Tree: TableScan alias: a - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: key is not null (type: boolean) - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string) outputColumnNames: _col0 - Statistics: Num rows: 112 Data size: 55000 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 240 Data size: 116240 Basic stats: COMPLETE Column stats: NONE Sorted Merge Bucket Map Join Operator condition map: Inner Join 0 to 1 @@ -963,7 +979,7 @@ STAGE PLANS: partition values: ds 2008-04-08 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -971,7 +987,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -979,7 +995,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -987,7 +1003,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -1012,7 +1028,7 @@ STAGE PLANS: partition values: ds 2008-04-09 properties: - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value @@ -1020,7 +1036,7 @@ STAGE PLANS: columns.types string:string #### A masked pattern was here #### name default.bucket_big - numFiles 2 + numFiles 4 numRows 0 partition_columns ds partition_columns.types string @@ -1028,7 +1044,7 @@ STAGE PLANS: serialization.ddl struct bucket_big { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - totalSize 2750 + totalSize 5812 #### A masked pattern was here #### serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe @@ -1036,7 +1052,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: SORTBUCKETCOLSPREFIX TRUE - bucket_count 2 + bucket_count 4 bucket_field_name key column.name.delimiter , columns key,value