http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java index 86f466f..77c9ecc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSerializedImpl.java @@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.exec.vector.keyseries; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.fast.SerializeWrite; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; +import org.apache.hive.common.util.Murmur3; /** * Implementation of base serialization interface. @@ -103,7 +103,7 @@ public abstract class VectorKeySeriesSerializedImpl<T extends SerializeWrite> byte[] bytes = output.getData(); for (int i = 0; i < nonNullKeyCount; i++) { keyLength = serializedKeyLengths[i]; - hashCodes[i] = HashCodeUtil.murmurHash(bytes, offset, keyLength); + hashCodes[i] = Murmur3.hash32(bytes, offset, keyLength, 0); offset += keyLength; } }
http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 1bc3fda..42b7784 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -40,6 +41,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import com.google.common.base.Preconditions; +import org.apache.hive.common.util.Murmur3; /** * This class is the object hash (not Uniform Hash) operator class for native vectorized reduce sink. @@ -226,61 +228,110 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp int[] selected = batch.selected; final int size = batch.size; - for (int logical = 0; logical < size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - - final int hashCode; - if (isEmptyBuckets) { - if (isEmptyPartitions) { - hashCode = nonPartitionRandom.nextInt(); - } else { + + // EmptyBuckets = true + if (isEmptyBuckets) { + if (isEmptyPartitions) { + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + final int hashCode = nonPartitionRandom.nextInt(); + postProcess(batch, batchIndex, tag, hashCode); + } + } else { // isEmptyPartition = false + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - hashCode = + final int hashCode = bucketingVersion == 2 && !vectorDesc.getIsAcidChange() ? ObjectInspectorUtils.getBucketHashCode( + partitionFieldValues, partitionObjectInspectors) : + ObjectInspectorUtils.getBucketHashCodeOld( partitionFieldValues, partitionObjectInspectors); + postProcess(batch, batchIndex, tag, hashCode); } - } else { - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = - ObjectInspectorUtils.getBucketNumber( + } + } else { // EmptyBuckets = false + if (isEmptyPartitions) { + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = bucketingVersion == 2 ? + ObjectInspectorUtils.getBucketNumber(bucketFieldValues, + bucketObjectInspectors, numBuckets) : + ObjectInspectorUtils.getBucketNumberOld( bucketFieldValues, bucketObjectInspectors, numBuckets); - if (isEmptyPartitions) { - hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; - } else { + final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; + postProcess(batch, batchIndex, tag, hashCode); + } + } else { // isEmptyPartition = false + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - hashCode = - ObjectInspectorUtils.getBucketHashCode( - partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int hashCode, bucketNum; + if (bucketingVersion == 2 && !vectorDesc.getIsAcidChange()) { + bucketNum = + ObjectInspectorUtils.getBucketNumber( + bucketFieldValues, bucketObjectInspectors, numBuckets); + hashCode = ObjectInspectorUtils.getBucketHashCode( + partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; + } else { // old bucketing logic + bucketNum = + ObjectInspectorUtils.getBucketNumberOld( + bucketFieldValues, bucketObjectInspectors, numBuckets); + hashCode = ObjectInspectorUtils.getBucketHashCodeOld( + partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; + } + postProcess(batch, batchIndex, tag, hashCode); } } + } + } catch (Exception e) { + throw new HiveException(e); + } + } - if (!isEmptyKey) { - keyBinarySortableSerializeWrite.reset(); - keyVectorSerializeRow.serializeWrite(batch, batchIndex); - - // One serialized key for 1 or more rows for the duplicate keys. - final int keyLength = keyOutput.getLength(); - if (tag == -1 || reduceSkipTag) { - keyWritable.set(keyOutput.getData(), 0, keyLength); - } else { - keyWritable.setSize(keyLength + 1); - System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = reduceTagByte; - } - keyWritable.setDistKeyLength(keyLength); - } + private void processKey(VectorizedRowBatch batch, int batchIndex, int tag) + throws HiveException{ + if (isEmptyKey) return; - keyWritable.setHashCode(hashCode); + try { + keyBinarySortableSerializeWrite.reset(); + keyVectorSerializeRow.serializeWrite(batch, batchIndex); + + // One serialized key for 1 or more rows for the duplicate keys. + final int keyLength = keyOutput.getLength(); + if (tag == -1 || reduceSkipTag) { + keyWritable.set(keyOutput.getData(), 0, keyLength); + } else { + keyWritable.setSize(keyLength + 1); + System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = reduceTagByte; + } + keyWritable.setDistKeyLength(keyLength); + } catch (Exception e) { + throw new HiveException(e); + } + } - if (!isEmptyValue) { - valueLazyBinarySerializeWrite.reset(); - valueVectorSerializeRow.serializeWrite(batch, batchIndex); + private void processValue(VectorizedRowBatch batch, int batchIndex) throws HiveException { + if (isEmptyValue) return; - valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); - } + try { + valueLazyBinarySerializeWrite.reset(); + valueVectorSerializeRow.serializeWrite(batch, batchIndex); - collect(keyWritable, valueBytesWritable); - } + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private void postProcess(VectorizedRowBatch batch, int batchIndex, int tag, int hashCode) throws HiveException { + try { + processKey(batch, batchIndex, tag); + keyWritable.setHashCode(hashCode); + processValue(batch, batchIndex); + collect(keyWritable, valueBytesWritable); } catch (Exception e) { throw new HiveException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 71498a1..9a21503 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -91,6 +91,7 @@ public final class HiveFileFormatUtils { // immutable maps Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> inputFormatCheckerMap; + Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> textInputFormatCheckerMap; Map<Class<?>, Class<? extends OutputFormat>> outputFormatSubstituteMap; // mutable thread-safe map to store instances @@ -114,6 +115,10 @@ public final class HiveFileFormatUtils { .put(RCFileInputFormat.class, RCFileInputFormat.class) .put(OrcInputFormat.class, OrcInputFormat.class) .build(); + textInputFormatCheckerMap = ImmutableMap + .<Class<? extends InputFormat>, Class<? extends InputFormatChecker>>builder() + .put(SequenceFileInputFormat.class, SequenceFileInputFormatChecker.class) + .build(); outputFormatSubstituteMap = ImmutableMap .<Class<?>, Class<? extends OutputFormat>>builder() .put(IgnoreKeyTextOutputFormat.class, HiveIgnoreKeyTextOutputFormat.class) @@ -129,6 +134,10 @@ public final class HiveFileFormatUtils { return inputFormatCheckerMap.keySet(); } + public Set<Class<? extends InputFormat>> registeredTextClasses() { + return textInputFormatCheckerMap.keySet(); + } + public Class<? extends OutputFormat> getOutputFormatSubstiture(Class<?> origin) { return outputFormatSubstituteMap.get(origin); } @@ -214,7 +223,7 @@ public final class HiveFileFormatUtils { } } if (files2.isEmpty()) return true; - Set<Class<? extends InputFormat>> inputFormatter = FileChecker.getInstance().registeredClasses(); + Set<Class<? extends InputFormat>> inputFormatter = FileChecker.getInstance().registeredTextClasses(); for (Class<? extends InputFormat> reg : inputFormatter) { boolean result = checkInputFormat(fs, conf, reg, files2); if (result) { http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 019682f..2337a35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -555,9 +555,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, return false; } for (FileStatus file : files) { - // 0 length files cannot be ORC files - if (file.getLen() == 0) { - return false; + if (!HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("mr")) { + // 0 length files cannot be ORC files, not valid for MR. + if (file.getLen() == 0) { + return false; + } } try { OrcFile.createReader(file.getPath(), http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index a51fdd3..abd678b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -188,6 +188,9 @@ public class Table implements Serializable { // set create time t.setCreateTime((int) (System.currentTimeMillis() / 1000)); } + // Explictly set the bucketing version + t.getParameters().put(hive_metastoreConstants.TABLE_BUCKETING_VERSION, + "2"); return t; } @@ -399,6 +402,9 @@ public class Table implements Serializable { tTable.getParameters().put(name, value); } + // Please note : Be very careful in using this function. If not used carefully, + // you may end up overwriting all the existing properties. If the usecase is to + // add or update certain properties use setProperty() instead. public void setParameters(Map<String, String> params) { tTable.setParameters(params); } @@ -450,6 +456,11 @@ public class Table implements Serializable { } } + public int getBucketingVersion() { + return Utilities.getBucketingVersion( + getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION)); + } + @Override public String toString() { return tTable.getTableName(); http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 7121bce..5d4774d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -153,15 +154,16 @@ public class ConvertJoinMapJoin implements NodeProcessor { } } - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - // Check if we are in LLAP, if so it needs to be determined if we should use BMJ or DPHJ - if (llapInfo != null) { - if (selectJoinForLlap(context, joinOp, tezBucketJoinProcCtx, llapInfo, mapJoinConversionPos, numBuckets)) { + if (numBuckets > 1) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + // Check if we are in LLAP, if so it needs to be determined if we should use BMJ or DPHJ + if (llapInfo != null) { + if (selectJoinForLlap(context, joinOp, tezBucketJoinProcCtx, llapInfo, mapJoinConversionPos, numBuckets)) { + return null; + } + } else if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { return null; } - } else if (numBuckets > 1 && - convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { - return null; } } @@ -180,7 +182,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks())); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null, + joinOp.getOpTraits().getNumReduceSinks(), joinOp.getOpTraits().getBucketingVersion())); preserveOperatorInfos(mapJoinOp, joinOp, context); // propagate this change till the next RS for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) { @@ -381,7 +384,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { context.parseContext.getContext().getPlanMapper().link(joinOp, mergeJoinOp); int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, - joinOp.getOpTraits().getSortCols(), numReduceSinks); + joinOp.getOpTraits().getSortCols(), numReduceSinks, + joinOp.getOpTraits().getBucketingVersion()); mergeJoinOp.setOpTraits(opTraits); preserveOperatorInfos(mergeJoinOp, joinOp, context); @@ -448,7 +452,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { return; } currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), - opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks())); + opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks(), + opTraits.getBucketingVersion())); for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -501,7 +506,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { // we can set the traits for this join operator opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks()); + tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks(), + joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); preserveOperatorInfos(mapJoinOp, joinOp, context); setNumberOfBucketsOnChildren(mapJoinOp); @@ -612,6 +618,38 @@ public class ConvertJoinMapJoin implements NodeProcessor { numBuckets = bigTableRS.getConf().getNumReducers(); } tezBucketJoinProcCtx.setNumBuckets(numBuckets); + + // With bucketing using two different versions. Version 1 for exiting + // tables and version 2 for new tables. All the inputs to the SMB must be + // from same version. This only applies to tables read directly and not + // intermediate outputs of joins/groupbys + int bucketingVersion = -1; + for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) { + // Check if the parent is coming from a table scan, if so, what is the version of it. + assert parentOp.getParentOperators() != null && parentOp.getParentOperators().size() == 1; + Operator<?> op = parentOp.getParentOperators().get(0); + while(op != null && !(op instanceof TableScanOperator + || op instanceof ReduceSinkOperator + || op instanceof CommonJoinOperator)) { + // If op has parents it is guaranteed to be 1. + List<Operator<?>> parents = op.getParentOperators(); + Preconditions.checkState(parents.size() == 0 || parents.size() == 1); + op = parents.size() == 1 ? parents.get(0) : null; + } + + if (op instanceof TableScanOperator) { + int localVersion = ((TableScanOperator)op).getConf(). + getTableMetadata().getBucketingVersion(); + if (bucketingVersion == -1) { + bucketingVersion = localVersion; + } else if (bucketingVersion != localVersion) { + // versions dont match, return false. + LOG.debug("SMB Join can't be performed due to bucketing version mismatch"); + return false; + } + } + } + LOG.info("We can convert the join to an SMB join."); return true; } @@ -1189,7 +1227,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { joinOp.getOpTraits().getBucketColNames(), numReducers, null, - joinOp.getOpTraits().getNumReduceSinks()); + joinOp.getOpTraits().getNumReduceSinks(), + joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); preserveOperatorInfos(mapJoinOp, joinOp, context); // propagate this change till the next RS http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java index 5f65f63..4f7d3c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree.Operator; @@ -84,8 +83,7 @@ public class FixedBucketPruningOptimizer extends Transform { @Override protected void generatePredicate(NodeProcessorCtx procCtx, - FilterOperator fop, TableScanOperator top) throws SemanticException, - UDFArgumentException { + FilterOperator fop, TableScanOperator top) throws SemanticException { FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx); Table tbl = top.getConf().getTableMetadata(); if (tbl.getNumBuckets() > 0) { @@ -122,8 +120,7 @@ public class FixedBucketPruningOptimizer extends Transform { @Override protected void generatePredicate(NodeProcessorCtx procCtx, - FilterOperator fop, TableScanOperator top) throws SemanticException, - UDFArgumentException { + FilterOperator fop, TableScanOperator top) throws SemanticException { FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx); if (ctxt.getNumBuckets() <= 0 || ctxt.getBucketCols().size() != 1) { // bucketing isn't consistent or there are >1 bucket columns @@ -225,6 +222,9 @@ public class FixedBucketPruningOptimizer extends Transform { bs.clear(); PrimitiveObjectInspector bucketOI = (PrimitiveObjectInspector)bucketField.getFieldObjectInspector(); PrimitiveObjectInspector constOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(bucketOI.getPrimitiveCategory()); + // Fetch the bucketing version from table scan operator + int bucketingVersion = top.getConf().getTableMetadata().getBucketingVersion(); + for (Object literal: literals) { PrimitiveObjectInspector origOI = PrimitiveObjectInspectorFactory.getPrimitiveObjectInspectorFromClass(literal.getClass()); Converter conv = ObjectInspectorConverters.getConverter(origOI, constOI); @@ -233,10 +233,12 @@ public class FixedBucketPruningOptimizer extends Transform { return; } Object convCols[] = new Object[] {conv.convert(literal)}; - int n = ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets()); + int n = bucketingVersion == 2 ? + ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets()) : + ObjectInspectorUtils.getBucketNumberOld(convCols, new ObjectInspector[]{constOI}, ctxt.getNumBuckets()); bs.set(n); - if (ctxt.isCompat()) { - int h = ObjectInspectorUtils.getBucketHashCode(convCols, new ObjectInspector[]{constOI}); + if (bucketingVersion == 1 && ctxt.isCompat()) { + int h = ObjectInspectorUtils.getBucketHashCodeOld(convCols, new ObjectInspector[]{constOI}); // -ve hashcodes had conversion to positive done in different ways in the past // abs() is now obsolete and all inserts now use & Integer.MAX_VALUE // the compat mode assumes that old data could've been loaded using the other conversion http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java index 2be3c9b..1626e26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.SerDeException; /** * Operator factory for pruning processing of operator graph We find @@ -101,7 +102,7 @@ public abstract class PrunerOperatorFactory { * @throws UDFArgumentException */ protected abstract void generatePredicate(NodeProcessorCtx procCtx, FilterOperator fop, - TableScanOperator top) throws SemanticException, UDFArgumentException; + TableScanOperator top) throws SemanticException; /** * Add pruning predicate. * http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 1c56562..51010aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -245,7 +245,7 @@ public class SortedDynPartitionOptimizer extends Transform { // Create ReduceSink operator ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, sortNullOrder, - allRSCols, bucketColumns, numBuckets, fsParent); + allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); List<ExprNodeDesc> descs = new ArrayList<ExprNodeDesc>(allRSCols.size()); List<String> colNames = new ArrayList<String>(); @@ -442,7 +442,7 @@ public class SortedDynPartitionOptimizer extends Transform { public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List<Integer> sortPositions, List<Integer> sortOrder, List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns, int numBuckets, - Operator<? extends OperatorDesc> parent) throws SemanticException { + Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) throws SemanticException { // Order of KEY columns // 1) Partition columns @@ -577,7 +577,7 @@ public class SortedDynPartitionOptimizer extends Transform { // Number of reducers is set to default (-1) ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), valCols, keyColNames, distinctColumnIndices, valColNames, -1, partCols, -1, keyTable, - valueTable); + valueTable, writeType); rsConf.setBucketCols(bucketColumns); rsConf.setNumBuckets(numBuckets); http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java index 0e995d7..0ce359f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -209,7 +210,7 @@ public class SortedDynPartitionTimeGranularityOptimizer extends Transform { sortNullOrder = Lists.newArrayList(0); // nulls first } ReduceSinkOperator rsOp = getReduceSinkOp(keyPositions, sortOrder, - sortNullOrder, allRSCols, granularitySelOp); + sortNullOrder, allRSCols, granularitySelOp, fsOp.getConf().getWriteType()); // Create backtrack SelectOp final List<ExprNodeDesc> descs = new ArrayList<>(allRSCols.size()); @@ -393,8 +394,8 @@ public class SortedDynPartitionTimeGranularityOptimizer extends Transform { } private ReduceSinkOperator getReduceSinkOp(List<Integer> keyPositions, List<Integer> sortOrder, - List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, Operator<? extends OperatorDesc> parent - ) { + List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, Operator<? extends OperatorDesc> parent, + AcidUtils.Operation writeType) { // we will clone here as RS will update bucket column key with its // corresponding with bucket number and hence their OIs final ArrayList<ExprNodeDesc> keyCols = keyPositions.stream() @@ -452,7 +453,7 @@ public class SortedDynPartitionTimeGranularityOptimizer extends Transform { // Number of reducers is set to default (-1) final ReduceSinkDesc rsConf = new ReduceSinkDesc(keyCols, keyCols.size(), valCols, keyColNames, distinctColumnIndices, valColNames, -1, partCols, -1, keyTable, - valueTable); + valueTable, writeType); final ArrayList<ColumnInfo> signature = parent.getSchema().getSignature() http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 69d9f31..9e54465 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -92,10 +92,12 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); int numBuckets = -1; int numReduceSinks = 1; + int bucketingVersion = -1; OpTraits parentOpTraits = rs.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); numReduceSinks += parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } List<String> bucketCols = new ArrayList<>(); @@ -134,8 +136,10 @@ public class OpTraitsRulesProcFactory { } listBucketCols.add(bucketCols); - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, + listBucketCols, numReduceSinks, bucketingVersion); rs.setOpTraits(opTraits); + rs.setBucketingVersion(bucketingVersion); return null; } } @@ -213,7 +217,8 @@ public class OpTraitsRulesProcFactory { sortedColsList.add(sortCols); } // num reduce sinks hardcoded to 0 because TS has no parents - OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0); + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, + sortedColsList, 0, table.getBucketingVersion()); ts.setOpTraits(opTraits); return null; } @@ -239,12 +244,15 @@ public class OpTraitsRulesProcFactory { List<List<String>> listBucketCols = new ArrayList<List<String>>(); int numReduceSinks = 0; + int bucketingVersion = -1; OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numReduceSinks = parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, + numReduceSinks, bucketingVersion); gbyOp.setOpTraits(opTraits); return null; } @@ -298,12 +306,15 @@ public class OpTraitsRulesProcFactory { int numBuckets = -1; int numReduceSinks = 0; + int bucketingVersion = -1; OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); numReduceSinks = parentOpTraits.getNumReduceSinks(); + bucketingVersion = parentOpTraits.getBucketingVersion(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, + numReduceSinks, bucketingVersion); selOp.setOpTraits(opTraits); return null; } @@ -338,7 +349,10 @@ public class OpTraitsRulesProcFactory { pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks)); + // The bucketingVersion is not relevant here as it is never used. + // For SMB, we look at the parent tables' bucketing versions and for + // bucket map join the big table's bucketing version is considered. + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks, 2)); return null; } @@ -392,6 +406,8 @@ public class OpTraitsRulesProcFactory { Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd; int numReduceSinks = 0; + int bucketingVersion = -1; + boolean bucketingVersionSeen = false; for (Operator<?> parentOp : operator.getParentOperators()) { if (parentOp.getOpTraits() == null) { continue; @@ -399,8 +415,17 @@ public class OpTraitsRulesProcFactory { if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) { numReduceSinks = parentOp.getOpTraits().getNumReduceSinks(); } + // If there is mismatch in bucketingVersion, then it should be set to + // -1, that way SMB will be disabled. + if (bucketingVersion == -1 && !bucketingVersionSeen) { + bucketingVersion = parentOp.getOpTraits().getBucketingVersion(); + bucketingVersionSeen = true; + } else if (bucketingVersion != parentOp.getOpTraits().getBucketingVersion()) { + bucketingVersion = -1; + } } - OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks); + OpTraits opTraits = new OpTraits(null, -1, + null, numReduceSinks, bucketingVersion); operator.setOpTraits(opTraits); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 068f25e..394f826 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -40,6 +40,8 @@ import java.util.regex.Pattern; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.*; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,11 +70,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOpera import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkEmptyKeyOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkObjectHashOperator; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; @@ -3808,6 +3805,9 @@ public class Vectorizer implements PhysicalPlanResolver { LOG.info("Vectorizer vectorizeOperator reduce sink class " + opClass.getSimpleName()); + // Get the bucketing version + int bucketingVersion = ((ReduceSinkOperator)op).getBucketingVersion(); + Operator<? extends OperatorDesc> vectorOp = null; try { vectorOp = OperatorFactory.getVectorOperator( @@ -3819,6 +3819,10 @@ public class Vectorizer implements PhysicalPlanResolver { throw new HiveException(e); } + // Set the bucketing version + Preconditions.checkArgument(vectorOp instanceof VectorReduceSinkCommonOperator); + vectorOp.setBucketingVersion(bucketingVersion); + return vectorOp; } @@ -4026,6 +4030,8 @@ public class Vectorizer implements PhysicalPlanResolver { vectorDesc.setHasDistinctColumns(hasDistinctColumns); vectorDesc.setIsKeyBinarySortable(isKeyBinarySortable); vectorDesc.setIsValueLazyBinary(isValueLazyBinary); + vectorDesc.setIsAcidChange(desc.getWriteType() == AcidUtils.Operation.DELETE || + desc.getWriteType() == AcidUtils.Operation.UPDATE); // This indicates we logged an inconsistency (from our point-of-view) and will not make this // operator native... http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 7b1fd5f..8e75db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -121,7 +121,8 @@ public class SparkMapJoinOptimizer implements NodeProcessor { } // we can set the traits for this join operator - OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks()); + OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, + joinOp.getOpTraits().getNumReduceSinks(), joinOp.getOpTraits().getBucketingVersion()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1dccf96..0205650 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -235,6 +235,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCardinalityViolation; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline; @@ -8411,9 +8412,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { sortCols.add(exprNode); } } + + Table dest_tab = qb.getMetaData().getDestTableForAlias(dest); + AcidUtils.Operation acidOp = Operation.NOT_ACID; + if (AcidUtils.isFullAcidTable(dest_tab)) { + acidOp = getAcidType(Utilities.getTableDesc(dest_tab).getOutputFileFormatClass(), dest); + } Operator result = genReduceSinkPlan( input, partCols, sortCols, order.toString(), nullOrder.toString(), - numReducers, Operation.NOT_ACID, true); + numReducers, acidOp, true); if (result.getParentOperators().size() == 1 && result.getParentOperators().get(0) instanceof ReduceSinkOperator) { ((ReduceSinkOperator) result.getParentOperators().get(0)) @@ -10806,7 +10813,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { */ private ExprNodeDesc genSamplePredicate(TableSample ts, List<String> bucketCols, boolean useBucketCols, String alias, - RowResolver rwsch, QBMetaData qbm, ExprNodeDesc planExpr) + RowResolver rwsch, QBMetaData qbm, ExprNodeDesc planExpr, + int bucketingVersion) throws SemanticException { ExprNodeDesc numeratorExpr = new ExprNodeConstantDesc( @@ -10836,22 +10844,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ExprNodeDesc equalsExpr = null; { ExprNodeDesc hashfnExpr = new ExprNodeGenericFuncDesc( - TypeInfoFactory.intTypeInfo, new GenericUDFHash(), args); - assert (hashfnExpr != null); + TypeInfoFactory.intTypeInfo, + bucketingVersion == 2 ? new GenericUDFMurmurHash() : new GenericUDFHash(), args); LOG.info("hashfnExpr = " + hashfnExpr); ExprNodeDesc andExpr = TypeCheckProcFactory.DefaultExprProcessor .getFuncExprNodeDesc("&", hashfnExpr, intMaxExpr); - assert (andExpr != null); LOG.info("andExpr = " + andExpr); ExprNodeDesc modExpr = TypeCheckProcFactory.DefaultExprProcessor .getFuncExprNodeDesc("%", andExpr, denominatorExpr); - assert (modExpr != null); LOG.info("modExpr = " + modExpr); LOG.info("numeratorExpr = " + numeratorExpr); equalsExpr = TypeCheckProcFactory.DefaultExprProcessor .getFuncExprNodeDesc("==", modExpr, numeratorExpr); LOG.info("equalsExpr = " + equalsExpr); - assert (equalsExpr != null); } return equalsExpr; } @@ -10952,6 +10957,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { topToTableProps.put(top, properties); tsDesc.setOpProps(properties); } + + // Set the bucketing Version + top.setBucketingVersion(tsDesc.getTableMetadata().getBucketingVersion()); } else { rwsch = opParseCtx.get(top).getRowResolver(); top.setChildOperators(null); @@ -11020,7 +11028,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // later LOG.info("No need for sample filter"); ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols, - colsEqual, alias, rwsch, qb.getMetaData(), null); + colsEqual, alias, rwsch, qb.getMetaData(), null, + tab.getBucketingVersion()); FilterDesc filterDesc = new FilterDesc( samplePredicate, true, new SampleDesc(ts.getNumerator(), ts.getDenominator(), tabBucketCols, true)); @@ -11032,7 +11041,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // create tableOp to be filterDesc and set as child to 'top' LOG.info("Need sample filter"); ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols, - colsEqual, alias, rwsch, qb.getMetaData(), null); + colsEqual, alias, rwsch, qb.getMetaData(), null, + tab.getBucketingVersion()); FilterDesc filterDesc = new FilterDesc(samplePredicate, true); filterDesc.setGenerated(true); op = OperatorFactory.getAndMakeChild(filterDesc, @@ -11063,7 +11073,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { tsSample.setInputPruning(true); qb.getParseInfo().setTabSample(alias, tsSample); ExprNodeDesc samplePred = genSamplePredicate(tsSample, tab - .getBucketCols(), true, alias, rwsch, qb.getMetaData(), null); + .getBucketCols(), true, alias, rwsch, qb.getMetaData(), null, + tab.getBucketingVersion()); FilterDesc filterDesc = new FilterDesc(samplePred, true, new SampleDesc(tsSample.getNumerator(), tsSample .getDenominator(), tab.getBucketCols(), true)); @@ -11082,7 +11093,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { .getFuncExprNodeDesc("rand", new ExprNodeConstantDesc(Integer .valueOf(460476415))); ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false, - alias, rwsch, qb.getMetaData(), randFunc); + alias, rwsch, qb.getMetaData(), randFunc, tab.getBucketingVersion()); FilterDesc filterDesc = new FilterDesc(samplePred, true); filterDesc.setGenerated(true); op = OperatorFactory.getAndMakeChild(filterDesc, http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java index 9621c3b..d3b62ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java @@ -22,17 +22,20 @@ import java.util.List; public class OpTraits { - List<List<String>> bucketColNames; - List<List<String>> sortColNames; - int numBuckets; - int numReduceSinks; + private List<List<String>> bucketColNames; + private List<List<String>> sortColNames; + private int numBuckets; + private int numReduceSinks; + private int bucketingVersion; public OpTraits(List<List<String>> bucketColNames, int numBuckets, - List<List<String>> sortColNames, int numReduceSinks) { + List<List<String>> sortColNames, int numReduceSinks, + int bucketingVersion) { this.bucketColNames = bucketColNames; this.numBuckets = numBuckets; this.sortColNames = sortColNames; this.numReduceSinks = numReduceSinks; + this.bucketingVersion = bucketingVersion; } public List<List<String>> getBucketColNames() { @@ -68,10 +71,17 @@ public class OpTraits { return this.numReduceSinks; } - + public void setBucketingVersion(int bucketingVersion) { + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } + @Override public String toString() { return "{ bucket column names: " + bucketColNames + "; sort column names: " - + sortColNames + "; bucket count: " + numBuckets + " }"; + + sortColNames + "; bucket count: " + numBuckets + "; bucketing version: " + bucketingVersion + " }"; } } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 056dfa4..2c5b655 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -790,7 +790,7 @@ public final class PlanUtils { return new ReduceSinkDesc(keyCols, numKeys, valueCols, outputKeyCols, distinctColIndices, outputValCols, tag, partitionCols, numReducers, keyTable, - valueTable); + valueTable, writeType); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index aa3c72b..61216bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; @@ -127,6 +128,8 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { private static transient Logger LOG = LoggerFactory.getLogger(ReduceSinkDesc.class); + private AcidUtils.Operation writeType; + public ReduceSinkDesc() { } @@ -137,7 +140,8 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { List<List<Integer>> distinctColumnIndices, ArrayList<String> outputValueColumnNames, int tag, ArrayList<ExprNodeDesc> partitionCols, int numReducers, - final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) { + final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo, + AcidUtils.Operation writeType) { this.keyCols = keyCols; this.numDistributionKeys = numDistributionKeys; this.valueCols = valueCols; @@ -151,6 +155,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { this.distinctColumnIndices = distinctColumnIndices; this.setNumBuckets(-1); this.setBucketCols(null); + this.writeType = writeType; } @Override @@ -669,4 +674,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc { return false; } + public AcidUtils.Operation getWriteType() { + return writeType; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 25b9189..4068e56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -184,6 +184,11 @@ public class TableDesc implements Serializable, Cloneable { return (properties.getProperty(hive_metastoreConstants.META_TABLE_STORAGE) != null); } + public int getBucketingVersion() { + return Utilities.getBucketingVersion( + properties.getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION)); + } + @Override public Object clone() { TableDesc ret = new TableDesc(); http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java index adea3b5..97e4284 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java @@ -73,6 +73,7 @@ public class VectorReduceSinkDesc extends AbstractVectorDesc { private boolean isKeyBinarySortable; private boolean isValueLazyBinary; private boolean isUnexpectedCondition; + private boolean isAcidChange; /* * The following conditions are for native Vector ReduceSink. @@ -143,4 +144,12 @@ public class VectorReduceSinkDesc extends AbstractVectorDesc { public boolean getIsUnexpectedCondition() { return isUnexpectedCondition; } + + public void setIsAcidChange(boolean isAcidChange) { + this.isAcidChange = isAcidChange; + } + + public boolean getIsAcidChange() { + return isAcidChange; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java index 7cd5718..1a75843 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java @@ -29,6 +29,7 @@ import org.apache.hadoop.io.IntWritable; /** * GenericUDF Class for computing hash values. */ +@Deprecated @Description(name = "hash", value = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments") public class GenericUDFHash extends GenericUDF { private transient ObjectInspector[] argumentOIs; @@ -48,7 +49,7 @@ public class GenericUDFHash extends GenericUDF { for(int i = 0; i < arguments.length; i++) { fieldValues[i] = arguments[i].get(); } - int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs); + int r = ObjectInspectorUtils.getBucketHashCodeOld(fieldValues, argumentOIs); result.set(r); return result; } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFMurmurHash.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFMurmurHash.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFMurmurHash.java new file mode 100644 index 0000000..f55ab9d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFMurmurHash.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hive.common.util.Murmur3; + +/** + * GenericUDF Class for computing murmurhash values. + */ +@Description(name = "hash", value = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments") +public class GenericUDFMurmurHash extends GenericUDF { + private transient ObjectInspector[] argumentOIs; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentTypeException { + + argumentOIs = arguments; + return PrimitiveObjectInspectorFactory.writableIntObjectInspector; + } + + private final IntWritable result = new IntWritable(); + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + Object[] fieldValues = new Object[arguments.length]; + for(int i = 0; i < arguments.length; i++) { + fieldValues[i] = arguments[i].get(); + } + int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs); + result.set(r); + return result; + } + + @Override + public String getDisplayString(String[] children) { + return getStandardDisplayString("hash", children, ","); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java index 7f7bc11..589e3b7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java @@ -63,6 +63,7 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests { @Test public void addPartition() throws Exception { + addPartition(false); } @@ -222,9 +223,9 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests { List<String> rs = runStatementOnDriver( "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); String[][] expected = new String[][]{ - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", - "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t1\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}}; checkExpected(rs, expected, "add partition (p=0)"); } @@ -238,7 +239,7 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests { * renamed during add. */ @Test - public void addPartitionReaname() throws Exception { + public void addPartitionRename() throws Exception { runStatementOnDriver("drop table if exists T"); runStatementOnDriver("drop table if exists Tstage"); runStatementOnDriver("create table T (a int, b int) partitioned by (p int) " + @@ -261,9 +262,9 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests { List<String> rs = runStatementOnDriver( "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); String[][] expected = new String[][]{ - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", - "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t1\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}}; checkExpected(rs, expected, "add partition (p=0)"); } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 12d57c6..6a3be39 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -765,13 +765,13 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1))); Assert.assertEquals("", 4, rs.size()); Assert.assertTrue(rs.get(0), - rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1")); + rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000001_0")); Assert.assertTrue(rs.get(1), - rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0")); + rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0_copy_1")); Assert.assertTrue(rs.get(2), - rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t12")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); @@ -786,13 +786,13 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { } Assert.assertEquals("", 4, rs.size()); Assert.assertTrue(rs.get(0), - rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00000")); + rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00001")); Assert.assertTrue(rs.get(1), - rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000001/bucket_00001")); Assert.assertTrue(rs.get(2), - rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t12")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); @@ -820,7 +820,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { int[][] expected = {{0, -1}, {1, -1}, {3, -1}}; Assert.assertEquals(stringifyValues(expected), r); } - //@Ignore("see bucket_num_reducers_acid2.q") + @Ignore("Moved to Tez") @Test public void testMoreBucketsThanReducers2() throws Exception { //todo: try using set VerifyNumReducersHook.num.reducers=10; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index dc19752..e882e40 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -368,14 +368,14 @@ public class TestTxnCommands2 { * Note: order of rows in a file ends up being the reverse of order in values clause (why?!) */ String[][] expected = { - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"}, - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"}, - {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"}, - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t0\t13", "bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t15", "bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t0\t17", "bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t0\t120", "bucket_00001"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"}, - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":6}\t1\t6", "bucket_00001"}, {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"} }; Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); @@ -469,7 +469,7 @@ public class TestTxnCommands2 { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, buckets.length); // only one bucket file - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000")); } else { Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); } @@ -495,14 +495,14 @@ public class TestTxnCommands2 { if (status[i].getPath().getName().matches("base_.*")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(1, buckets.length); + Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); } } Assert.assertTrue(sawNewBase); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); + resultData = new int[][] {{3, 4}, {1, 2}}; + Assert.assertEquals(stringifyValuesNoSort(resultData), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -528,11 +528,11 @@ public class TestTxnCommands2 { Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(1, buckets.length); + Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); + resultData = new int[][] {{3, 4}, {1, 2}}; + Assert.assertEquals(stringifyValuesNoSort(resultData), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -767,7 +767,7 @@ public class TestTxnCommands2 { } else if (numDelta == 2) { Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); Assert.assertEquals(1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); } } else if (status[i].getPath().getName().matches("delete_delta_.*")) { numDeleteDelta++; @@ -822,15 +822,15 @@ public class TestTxnCommands2 { } else if (numBase == 2) { // The new base dir now has two bucket files, since the delta dir has two bucket files Assert.assertEquals("base_0000002", status[i].getPath().getName()); - Assert.assertEquals(1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + Assert.assertEquals(2, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); } } } Assert.assertEquals(2, numBase); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); + resultData = new int[][] {{3, 4}, {1, 3}}; + Assert.assertEquals(stringifyValuesNoSort(resultData), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -850,11 +850,11 @@ public class TestTxnCommands2 { Assert.assertEquals("base_0000002", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); - Assert.assertEquals(1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + Assert.assertEquals(2, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); + resultData = new int[][] {{3, 4}, {1, 3}}; + Assert.assertEquals(stringifyValuesNoSort(resultData), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -2176,6 +2176,27 @@ public class TestTxnCommands2 { } return rs; } + + /** + * takes raw data and turns it into a string as if from Driver.getResults() + * sorts rows in dictionary order + */ + static List<String> stringifyValuesNoSort(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + List<String> rs = new ArrayList<String>(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + static class RowComp implements Comparator<int[]> { @Override public int compare(int[] row1, int[] row2) { http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 4b2f961..af43b14 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -184,7 +184,7 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas order by ROW__ID"); String expected[][] = { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00001"}, }; checkExpected(rs, expected, "Unexpected row count after ctas from non acid table"); @@ -195,7 +195,7 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID"); String expected2[][] = { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00001"} }; checkExpected(rs, expected2, "Unexpected row count after ctas from acid table"); @@ -204,10 +204,10 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { " union all select a, b from " + Table.ACIDTBL); rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas3 order by ROW__ID"); String expected3[][] = { - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000"}, {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00001"}, - {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00002"}, + {"{\"writeid\":1,\"bucketid\":537067520,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00003"}, }; checkExpected(rs, expected3, "Unexpected row count after ctas from union all query"); @@ -269,9 +269,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver String expected[][] = { {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000"}, - {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"}, {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0003/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t7\t8", "/delta_0000001_0000001_0002/bucket_00001"}, }; checkExpected(rs, expected, "Unexpected row count after ctas"); } http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/org/apache/hadoop/hive/ql/parse/authorization/TestPrivilegesV1.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/authorization/TestPrivilegesV1.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/authorization/TestPrivilegesV1.java index 4a33885..1fa11fc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/authorization/TestPrivilegesV1.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/authorization/TestPrivilegesV1.java @@ -41,9 +41,9 @@ public class TestPrivilegesV1 extends PrivilegesTestBase{ public void setup() throws Exception { queryState = new QueryState.Builder().build(); db = Mockito.mock(Hive.class); + HiveConf hiveConf = queryState.getConf(); table = new Table(DB, TABLE); partition = new Partition(table); - HiveConf hiveConf = queryState.getConf(); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/archive_excludeHadoop20.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/archive_excludeHadoop20.q b/ql/src/test/queries/clientpositive/archive_excludeHadoop20.q index 75bd579..6d5e2ac 100644 --- a/ql/src/test/queries/clientpositive/archive_excludeHadoop20.q +++ b/ql/src/test/queries/clientpositive/archive_excludeHadoop20.q @@ -47,7 +47,7 @@ CREATE TABLE harbucket(key INT) PARTITIONED by (ds STRING) CLUSTERED BY (key) INTO 10 BUCKETS; -INSERT OVERWRITE TABLE harbucket PARTITION(ds='1') SELECT CAST(key AS INT) AS a FROM tstsrc WHERE key < 50; +INSERT OVERWRITE TABLE harbucket PARTITION(ds='1') SELECT CAST(key AS INT) AS a FROM tstsrc WHERE key > 50; SELECT key FROM harbucket TABLESAMPLE(BUCKET 1 OUT OF 10) SORT BY key; ALTER TABLE tstsrcpart ARCHIVE PARTITION (ds='2008-04-08', hr='12'); @@ -59,7 +59,7 @@ SELECT key FROM harbucket TABLESAMPLE(BUCKET 1 OUT OF 10) SORT BY key; CREATE TABLE old_name(key INT) PARTITIONED by (ds STRING); -INSERT OVERWRITE TABLE old_name PARTITION(ds='1') SELECT CAST(key AS INT) AS a FROM tstsrc WHERE key < 50; +INSERT OVERWRITE TABLE old_name PARTITION(ds='1') SELECT CAST(key AS INT) AS a FROM tstsrc WHERE key > 50; ALTER TABLE old_name ARCHIVE PARTITION (ds='1'); SELECT SUM(hash(col)) FROM (SELECT transform(*) using 'tr "\t" "_"' AS col FROM (SELECT * FROM old_name WHERE ds='1') subq1) subq2; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/bucket_many.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_many.q b/ql/src/test/queries/clientpositive/bucket_many.q index 6cd3004..8abcdc7 100644 --- a/ql/src/test/queries/clientpositive/bucket_many.q +++ b/ql/src/test/queries/clientpositive/bucket_many.q @@ -12,6 +12,6 @@ insert overwrite table bucket_many select * from src; explain -select * from bucket_many tablesample (bucket 1 out of 256) s; +select * from bucket_many tablesample (bucket 2 out of 256) s; -select * from bucket_many tablesample (bucket 1 out of 256) s; +select * from bucket_many tablesample (bucket 2 out of 256) s; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q index 725dd4c..5622ce2 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q @@ -227,7 +227,7 @@ from tab1 a join tab_part b on a.key = b.key; -- No map joins should be created. set hive.convert.join.bucket.mapjoin.tez = false; -set hive.auto.convert.join.noconditionaltask.size=1500; +set hive.auto.convert.join.noconditionaltask.size=15000; explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/bucket_num_reducers.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers.q b/ql/src/test/queries/clientpositive/bucket_num_reducers.q index 7483e5e..460ff1a 100644 --- a/ql/src/test/queries/clientpositive/bucket_num_reducers.q +++ b/ql/src/test/queries/clientpositive/bucket_num_reducers.q @@ -7,11 +7,10 @@ set mapred.reduce.tasks = 10; -- and uses a post-hook to confirm that 10 tasks were created CREATE TABLE bucket_nr(key int, value string) CLUSTERED BY (key) INTO 50 BUCKETS; -set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; -set VerifyNumReducersHook.num.reducers=10; +explain extended insert overwrite table bucket_nr + select * from src; insert overwrite table bucket_nr select * from src; -set hive.exec.post.hooks=; drop table bucket_nr; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/bucket_num_reducers2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers2.q b/ql/src/test/queries/clientpositive/bucket_num_reducers2.q index 2c4e697..dc8084c 100644 --- a/ql/src/test/queries/clientpositive/bucket_num_reducers2.q +++ b/ql/src/test/queries/clientpositive/bucket_num_reducers2.q @@ -6,10 +6,10 @@ set hive.exec.reducers.max = 2; -- table with 3 buckets, and uses a post-hook to confirm that 1 reducer was used CREATE TABLE test_table(key int, value string) CLUSTERED BY (key) INTO 3 BUCKETS; -set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; -set VerifyNumReducersHook.num.reducers=1; +explain extended insert overwrite table test_table + select * from src; insert overwrite table test_table select * from src; -set hive.exec.post.hooks=; +drop table test_table; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q index 9776785..51b5885 100644 --- a/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q +++ b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q @@ -10,8 +10,6 @@ set mapred.reduce.tasks = 2; drop table if exists bucket_nr_acid2; create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true'); -set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; -set VerifyNumReducersHook.num.reducers=2; -- txn X write to b0 + b1 insert into bucket_nr_acid2 values(0,1),(1,1); @@ -27,7 +25,6 @@ insert into bucket_nr_acid2 values(2,4),(3,4); update bucket_nr_acid2 set b = -1; -set hive.exec.post.hooks=; select * from bucket_nr_acid2 order by a, b; drop table bucket_nr_acid2; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/insert_update_delete.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/insert_update_delete.q b/ql/src/test/queries/clientpositive/insert_update_delete.q index bd9f777..06f4db6 100644 --- a/ql/src/test/queries/clientpositive/insert_update_delete.q +++ b/ql/src/test/queries/clientpositive/insert_update_delete.q @@ -3,7 +3,6 @@ set hive.mapred.mode=nonstrict; set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; - create table acid_iud(a int, b varchar(128)) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); insert into table acid_iud select cint, cast(cstring1 as varchar(128)) from alltypesorc where cint < 0 order by cint limit 10; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/sample10.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/sample10.q b/ql/src/test/queries/clientpositive/sample10.q index 7c2de2e..b0aab14 100644 --- a/ql/src/test/queries/clientpositive/sample10.q +++ b/ql/src/test/queries/clientpositive/sample10.q @@ -17,9 +17,9 @@ create table srcpartbucket (key string, value string) partitioned by (ds string, insert overwrite table srcpartbucket partition(ds, hr) select * from srcpart where ds is not null and key < 10; explain extended -select ds, count(1) from srcpartbucket tablesample (bucket 1 out of 4 on key) where ds is not null group by ds ORDER BY ds ASC; +select ds, count(1) from srcpartbucket tablesample (bucket 2 out of 4 on key) where ds is not null group by ds ORDER BY ds ASC; -select ds, count(1) from srcpartbucket tablesample (bucket 1 out of 4 on key) where ds is not null group by ds ORDER BY ds ASC; +select ds, count(1) from srcpartbucket tablesample (bucket 2 out of 4 on key) where ds is not null group by ds ORDER BY ds ASC; select ds, count(1) from srcpartbucket tablesample (bucket 1 out of 2 on key) where ds is not null group by ds ORDER BY ds ASC; http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q index d318f7d..0500a62 100644 --- a/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q +++ b/ql/src/test/queries/clientpositive/tez_dynpart_hashjoin_3.q @@ -16,8 +16,8 @@ limit 1; set hive.auto.convert.join=true; set hive.optimize.dynamic.partition.hashjoin=true; -set hive.auto.convert.join.noconditionaltask.size=200000; -set hive.exec.reducers.bytes.per.reducer=200000; +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.exec.reducers.bytes.per.reducer=2000; explain select a.* http://git-wip-us.apache.org/repos/asf/hive/blob/091fd962/ql/src/test/queries/clientpositive/tez_smb_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_smb_1.q b/ql/src/test/queries/clientpositive/tez_smb_1.q index ecfb0dc..0f8f22f 100644 --- a/ql/src/test/queries/clientpositive/tez_smb_1.q +++ b/ql/src/test/queries/clientpositive/tez_smb_1.q @@ -34,7 +34,7 @@ select key,value from srcbucket_mapjoin; set hive.convert.join.bucket.mapjoin.tez = true; set hive.auto.convert.sortmerge.join = true; -set hive.auto.convert.join.noconditionaltask.size=500; +set hive.auto.convert.join.noconditionaltask.size=50; explain select count(*) from tab s1 join tab s3 on s1.key=s3.key;