http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index c832cdb..5733688 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorCopyRow; import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; @@ -55,14 +54,17 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTabl import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -124,6 +126,10 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); // a mixture of input big table columns and new scratch columns. protected VectorizationContext vOutContext; + protected VectorMapJoinVariation vectorMapJoinVariation; + protected HashTableKind hashTableKind; + protected HashTableKeyType hashTableKeyType; + // The output column projection of the vectorized row batch. And, the type infos of the output // columns. protected int[] outputProjection; @@ -149,28 +155,70 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); protected String[] bigTableValueColumnNames; protected TypeInfo[] bigTableValueTypeInfos; - // This is a mapping of which big table columns (input and key/value expressions) will be - // part of the big table portion of the join output result. - protected VectorColumnOutputMapping bigTableRetainedMapping; + /* + * NOTE: + * The Big Table key columns are from the key expressions. + * The Big Table value columns are from the getExpr(posBigTable) expressions. + * Any calculations needed for those will be scratch columns. + * + * The Small Table key and value output columns are scratch columns. + * + * Big Table Retain Column Map / TypeInfos: + * Any Big Table Batch columns that will be in the output result. + * 0, 1, ore more Column Nums and TypeInfos + * + * Non Outer Small Table Key Mapping: + * For non-[FULL] OUTER MapJoin, when Big Table key columns are not retained for the output + * result but are needed for the Small Table output result, they are put in this mapping + * as they are required for copying rows to the overflow batch. + * + * Outer Small Table Key Mapping + * For [FULL] OUTER MapJoin, the mapping for any Small Table key columns needed for the + * output result from the Big Table key columns. The Big Table keys cannot be projected since + * on NOMATCH there must be a physical column present to hold the non-match NULL. + * + * Full Outer Small Table Key Mapping + * For FULL OUTER MapJoin, the mapping from any needed Small Table key columns to their area + * in the output result. + * + * For deserializing a FULL OUTER non-match Small Table key into the output result. + * Can be partial or empty if some or all Small Table key columns are not retained. + * + * Small Table Value Mapping + * The mapping from Small Table value columns to their area in the output result. + * + * For deserializing Small Table value into the output result. + * + * It is the Small Table value index to output column numbers and TypeInfos. + * That is, a mapping of the LazyBinary field order to output batch scratch columns for the + * small table portion. + * Or, to use the output column nums for OUTER Small Table value NULLs. + * + */ + protected int[] bigTableRetainColumnMap; + protected TypeInfo[] bigTableRetainTypeInfos; + + protected int[] nonOuterSmallTableKeyColumnMap; + protected TypeInfo[] nonOuterSmallTableKeyTypeInfos; - // This is a mapping of which keys will be copied from the big table (input and key expressions) - // to the small table result portion of the output for outer join. - protected VectorColumnOutputMapping bigTableOuterKeyMapping; + protected VectorColumnOutputMapping outerSmallTableKeyMapping; - // This is a mapping of the values in the small table hash table that will be copied to the - // small table result portion of the output. That is, a mapping of the LazyBinary field order - // to output batch scratch columns for the small table portion. - protected VectorColumnSourceMapping smallTableMapping; + protected VectorColumnSourceMapping fullOuterSmallTableKeyMapping; + protected VectorColumnSourceMapping smallTableValueMapping; + + // The MapJoin output result projection for both the Big Table input batch and the overflow batch. protected VectorColumnSourceMapping projectionMapping; // These are the output columns for the small table and the outer small table keys. - protected int[] smallTableOutputVectorColumns; - protected int[] bigTableOuterKeyOutputVectorColumns; + protected int[] outerSmallTableKeyColumnMap; + protected int[] smallTableValueColumnMap; // These are the columns in the big and small table that are ByteColumnVector columns. // We create data buffers for these columns so we can copy strings into those columns by value. protected int[] bigTableByteColumnVectorColumns; + protected int[] nonOuterSmallTableKeyByteColumnVectorColumns; + protected int[] outerSmallTableKeyByteColumnVectorColumns; protected int[] smallTableByteColumnVectorColumns; // The above members are initialized by the constructor and must not be @@ -186,13 +234,22 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); // portion of the join output. protected transient VectorCopyRow bigTableRetainedVectorCopy; + // This helper object deserializes BinarySortable format small table keys into columns of a row + // in a vectorized row batch. + protected int[] allSmallTableKeyColumnNums; + protected boolean[] allSmallTableKeyColumnIncluded; + protected transient VectorDeserializeRow<BinarySortableDeserializeRead> smallTableKeyOuterVectorDeserializeRow; + + protected transient VectorCopyRow nonOuterSmallTableKeyVectorCopy; + + // UNDONE // A helper object that efficiently copies the big table key columns (input or key expressions) - // that appear in the small table portion of the join output for outer joins. - protected transient VectorCopyRow bigTableVectorCopyOuterKeys; + // that appear in the small table portion of the join output. + protected transient VectorCopyRow outerSmallTableKeyVectorCopy; // This helper object deserializes LazyBinary format small table values into columns of a row // in a vectorized row batch. - protected transient VectorDeserializeRow<LazyBinaryDeserializeRead> smallTableVectorDeserializeRow; + protected transient VectorDeserializeRow<LazyBinaryDeserializeRead> smallTableValueVectorDeserializeRow; // This a 2nd batch with the same "column schema" as the big table batch that can be used to // build join output results in. If we can create some join output results in the big table @@ -207,6 +264,9 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); // Whether the native vectorized map join operator has performed its common setup. protected transient boolean needCommonSetup; + // Whether the native vectorized map join operator has performed its first batch setup. + protected transient boolean needFirstBatchSetup; + // Whether the native vectorized map join operator has performed its // native vector map join hash table setup. protected transient boolean needHashTableSetup; @@ -214,6 +274,9 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); // The small table hash table for the native vectorized map join operator. protected transient VectorMapJoinHashTable vectorMapJoinHashTable; + protected transient long batchCounter; + protected transient long rowCounter; + /** Kryo ctor. */ protected VectorMapJoinCommonOperator() { super(); @@ -246,9 +309,9 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); isOuterJoin = !desc.getNoOuterJoin(); - Map<Byte, List<ExprNodeDesc>> filterExpressions = desc.getFilters(); - bigTableFilterExpressions = vContext.getVectorExpressions(filterExpressions.get(posBigTable), - VectorExpressionDescriptor.Mode.FILTER); + vectorMapJoinVariation = this.vectorDesc.getVectorMapJoinVariation(); + hashTableKind = this.vectorDesc.getHashTableKind(); + hashTableKeyType = this.vectorDesc.getHashTableKeyType(); bigTableKeyColumnMap = vectorMapJoinInfo.getBigTableKeyColumnMap(); bigTableKeyColumnNames = vectorMapJoinInfo.getBigTableKeyColumnNames(); @@ -260,11 +323,19 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); bigTableValueTypeInfos = vectorMapJoinInfo.getBigTableValueTypeInfos(); bigTableValueExpressions = vectorMapJoinInfo.getSlimmedBigTableValueExpressions(); - bigTableRetainedMapping = vectorMapJoinInfo.getBigTableRetainedMapping(); + bigTableFilterExpressions = vectorMapJoinInfo.getBigTableFilterExpressions(); + + bigTableRetainColumnMap = vectorMapJoinInfo.getBigTableRetainColumnMap(); + bigTableRetainTypeInfos = vectorMapJoinInfo.getBigTableRetainTypeInfos(); + + nonOuterSmallTableKeyColumnMap = vectorMapJoinInfo.getNonOuterSmallTableKeyColumnMap(); + nonOuterSmallTableKeyTypeInfos = vectorMapJoinInfo.getNonOuterSmallTableKeyTypeInfos(); + + outerSmallTableKeyMapping = vectorMapJoinInfo.getOuterSmallTableKeyMapping(); - bigTableOuterKeyMapping = vectorMapJoinInfo.getBigTableOuterKeyMapping(); + fullOuterSmallTableKeyMapping = vectorMapJoinInfo.getFullOuterSmallTableKeyMapping(); - smallTableMapping = vectorMapJoinInfo.getSmallTableMapping(); + smallTableValueMapping = vectorMapJoinInfo.getSmallTableValueMapping(); projectionMapping = vectorMapJoinInfo.getProjectionMapping(); @@ -273,47 +344,96 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); protected void determineCommonInfo(boolean isOuter) throws HiveException { - bigTableOuterKeyOutputVectorColumns = bigTableOuterKeyMapping.getOutputColumns(); - smallTableOutputVectorColumns = smallTableMapping.getOutputColumns(); + outerSmallTableKeyColumnMap = outerSmallTableKeyMapping.getOutputColumns(); + + smallTableValueColumnMap = smallTableValueMapping.getOutputColumns(); // Which big table and small table columns are ByteColumnVector and need have their data buffer // to be manually reset for some join result processing? - bigTableByteColumnVectorColumns = getByteColumnVectorColumns(bigTableOuterKeyMapping); + bigTableByteColumnVectorColumns = + getByteColumnVectorColumns(bigTableRetainColumnMap, bigTableRetainTypeInfos); + + nonOuterSmallTableKeyByteColumnVectorColumns = + getByteColumnVectorColumns(nonOuterSmallTableKeyColumnMap, nonOuterSmallTableKeyTypeInfos); + + outerSmallTableKeyByteColumnVectorColumns = + getByteColumnVectorColumns(outerSmallTableKeyMapping); - smallTableByteColumnVectorColumns = getByteColumnVectorColumns(smallTableMapping); + smallTableByteColumnVectorColumns = + getByteColumnVectorColumns(smallTableValueMapping); outputProjection = projectionMapping.getOutputColumns(); outputTypeInfos = projectionMapping.getTypeInfos(); - if (LOG.isDebugEnabled()) { + if (LOG.isInfoEnabled()) { int[] orderDisplayable = new int[order.length]; for (int i = 0; i < order.length; i++) { orderDisplayable[i] = (int) order[i]; } - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor order " + Arrays.toString(orderDisplayable)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor posBigTable " + (int) posBigTable); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor posSingleVectorMapJoinSmallTable " + (int) posSingleVectorMapJoinSmallTable); - - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableKeyColumnMap " + Arrays.toString(bigTableKeyColumnMap)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableKeyColumnNames " + Arrays.toString(bigTableKeyColumnNames)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableKeyTypeInfos " + Arrays.toString(bigTableKeyTypeInfos)); - - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableValueColumnMap " + Arrays.toString(bigTableValueColumnMap)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableValueColumnNames " + Arrays.toString(bigTableValueColumnNames)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableValueTypeNames " + Arrays.toString(bigTableValueTypeInfos)); - - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableRetainedMapping " + bigTableRetainedMapping.toString()); - - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableOuterKeyMapping " + bigTableOuterKeyMapping.toString()); - - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor smallTableMapping " + smallTableMapping.toString()); - - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor bigTableByteColumnVectorColumns " + Arrays.toString(bigTableByteColumnVectorColumns)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor smallTableByteColumnVectorColumns " + Arrays.toString(smallTableByteColumnVectorColumns)); + LOG.info(getLoggingPrefix() + " order " + + Arrays.toString(orderDisplayable)); + LOG.info(getLoggingPrefix() + " posBigTable " + + (int) posBigTable); + LOG.info(getLoggingPrefix() + " posSingleVectorMapJoinSmallTable " + + (int) posSingleVectorMapJoinSmallTable); + + LOG.info(getLoggingPrefix() + " bigTableKeyColumnMap " + + Arrays.toString(bigTableKeyColumnMap)); + LOG.info(getLoggingPrefix() + " bigTableKeyColumnNames " + + Arrays.toString(bigTableKeyColumnNames)); + LOG.info(getLoggingPrefix() + " bigTableKeyTypeInfos " + + Arrays.toString(bigTableKeyTypeInfos)); + + LOG.info(getLoggingPrefix() + " bigTableValueColumnMap " + + Arrays.toString(bigTableValueColumnMap)); + LOG.info(getLoggingPrefix() + " bigTableValueColumnNames " + + Arrays.toString(bigTableValueColumnNames)); + LOG.info(getLoggingPrefix() + " bigTableValueTypeNames " + + Arrays.toString(bigTableValueTypeInfos)); + + LOG.info(getLoggingPrefix() + " getBigTableRetainColumnMap " + + Arrays.toString(bigTableRetainColumnMap)); + LOG.info(getLoggingPrefix() + " bigTableRetainTypeInfos " + + Arrays.toString(bigTableRetainTypeInfos)); + + LOG.info(getLoggingPrefix() + " nonOuterSmallTableKeyColumnMap " + + Arrays.toString(nonOuterSmallTableKeyColumnMap)); + LOG.info(getLoggingPrefix() + " nonOuterSmallTableKeyTypeInfos " + + Arrays.toString(nonOuterSmallTableKeyTypeInfos)); + + LOG.info(getLoggingPrefix() + " outerSmallTableKeyMapping " + + outerSmallTableKeyMapping.toString()); + + LOG.info(getLoggingPrefix() + " fullOuterSmallTableKeyMapping " + + fullOuterSmallTableKeyMapping.toString()); + + LOG.info(getLoggingPrefix() + " smallTableValueMapping " + + smallTableValueMapping.toString()); + + LOG.info(getLoggingPrefix() + " bigTableByteColumnVectorColumns " + + Arrays.toString(bigTableByteColumnVectorColumns)); + LOG.info(getLoggingPrefix() + " smallTableByteColumnVectorColumns " + + Arrays.toString(smallTableByteColumnVectorColumns)); + + LOG.info(getLoggingPrefix() + " outputProjection " + + Arrays.toString(outputProjection)); + LOG.info(getLoggingPrefix() + " outputTypeInfos " + + Arrays.toString(outputTypeInfos)); + + LOG.info(getLoggingPrefix() + " mapJoinDesc.getKeysString " + + conf.getKeysString()); + if (conf.getValueIndices() != null) { + for (Entry<Byte, int[]> entry : conf.getValueIndices().entrySet()) { + LOG.info(getLoggingPrefix() + " mapJoinDesc.getValueIndices +" + + (int) entry.getKey() + " " + Arrays.toString(entry.getValue())); + } + } + LOG.info(getLoggingPrefix() + " mapJoinDesc.getExprs " + + conf.getExprs().toString()); + LOG.info(getLoggingPrefix() + " mapJoinDesc.getRetainList " + + conf.getRetainList().toString()); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor outputProjection " + Arrays.toString(outputProjection)); - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor outputTypeInfos " + Arrays.toString(outputTypeInfos)); } setupVOutContext(conf.getOutputColumnNames()); @@ -323,11 +443,14 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); * Determine from a mapping which columns are BytesColumnVector columns. */ private int[] getByteColumnVectorColumns(VectorColumnMapping mapping) { + return getByteColumnVectorColumns(mapping.getOutputColumns(), mapping.getTypeInfos()); + } + + private int[] getByteColumnVectorColumns(int[] outputColumns, TypeInfo[] typeInfos) { + // Search mapping for any strings and return their output columns. ArrayList<Integer> list = new ArrayList<Integer>(); - int count = mapping.getCount(); - int[] outputColumns = mapping.getOutputColumns(); - TypeInfo[] typeInfos = mapping.getTypeInfos(); + final int count = outputColumns.length; for (int i = 0; i < count; i++) { int outputColumn = outputColumns[i]; String typeName = typeInfos[i].getTypeName(); @@ -345,10 +468,12 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); */ protected void setupVOutContext(List<String> outputColumnNames) { if (LOG.isDebugEnabled()) { - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor outputColumnNames " + outputColumnNames); + LOG.debug(getLoggingPrefix() + " outputColumnNames " + outputColumnNames); } if (outputColumnNames.size() != outputProjection.length) { - throw new RuntimeException("Output column names " + outputColumnNames + " length and output projection " + Arrays.toString(outputProjection) + " / " + Arrays.toString(outputTypeInfos) + " length mismatch"); + throw new RuntimeException("Output column names " + outputColumnNames + + " length and output projection " + Arrays.toString(outputProjection) + + " / " + Arrays.toString(outputTypeInfos) + " length mismatch"); } vOutContext.resetProjectionColumns(); for (int i = 0; i < outputColumnNames.size(); ++i) { @@ -357,7 +482,8 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); vOutContext.addProjectionColumn(columnName, outputColumn); if (LOG.isDebugEnabled()) { - LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator constructor addProjectionColumn " + i + " columnName " + columnName + " outputColumn " + outputColumn); + LOG.debug(getLoggingPrefix() + " addProjectionColumn " + i + " columnName " + columnName + + " outputColumn " + outputColumn); } } } @@ -386,9 +512,50 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); return hashTableLoader; } + /* + * Do FULL OUTER MapJoin operator initialization. + */ + private void initializeFullOuterObjects() throws HiveException { + + // The Small Table key type jnfo is the same as Big Table's. + TypeInfo[] smallTableKeyTypeInfos = bigTableKeyTypeInfos; + final int allKeysSize = smallTableKeyTypeInfos.length; + + /* + * The VectorMapJoinFullOuter{Long|MultiKey|String}Operator outputs 0, 1, or more + * Small Key columns in the join result. + */ + allSmallTableKeyColumnNums = new int[allKeysSize]; + Arrays.fill(allSmallTableKeyColumnNums, -1); + allSmallTableKeyColumnIncluded = new boolean[allKeysSize]; + + final int outputKeysSize = fullOuterSmallTableKeyMapping.getCount(); + int[] outputKeyNums = fullOuterSmallTableKeyMapping.getInputColumns(); + int[] outputKeyOutputColumns = fullOuterSmallTableKeyMapping.getOutputColumns(); + for (int i = 0; i < outputKeysSize; i++) { + final int outputKeyNum = outputKeyNums[i]; + allSmallTableKeyColumnNums[outputKeyNum] = outputKeyOutputColumns[i]; + allSmallTableKeyColumnIncluded[outputKeyNum] = true; + } + + if (hashTableKeyType == HashTableKeyType.MULTI_KEY && + outputKeysSize > 0) { + + smallTableKeyOuterVectorDeserializeRow = + new VectorDeserializeRow<BinarySortableDeserializeRead>( + new BinarySortableDeserializeRead( + smallTableKeyTypeInfos, + /* useExternalBuffer */ true)); + smallTableKeyOuterVectorDeserializeRow.init( + allSmallTableKeyColumnNums, allSmallTableKeyColumnIncluded); + } + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + VectorExpression.doTransientInit(bigTableFilterExpressions); VectorExpression.doTransientInit(bigTableKeyExpressions); VectorExpression.doTransientInit(bigTableValueExpressions); @@ -405,23 +572,34 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); /* * Create our vectorized copy row and deserialize row helper objects. */ - if (smallTableMapping.getCount() > 0) { - smallTableVectorDeserializeRow = + if (vectorMapJoinVariation == VectorMapJoinVariation.FULL_OUTER) { + initializeFullOuterObjects(); + } + + if (smallTableValueMapping.getCount() > 0) { + smallTableValueVectorDeserializeRow = new VectorDeserializeRow<LazyBinaryDeserializeRead>( new LazyBinaryDeserializeRead( - smallTableMapping.getTypeInfos(), + smallTableValueMapping.getTypeInfos(), /* useExternalBuffer */ true)); - smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns()); + smallTableValueVectorDeserializeRow.init(smallTableValueMapping.getOutputColumns()); } - if (bigTableRetainedMapping.getCount() > 0) { + if (bigTableRetainColumnMap.length > 0) { bigTableRetainedVectorCopy = new VectorCopyRow(); - bigTableRetainedVectorCopy.init(bigTableRetainedMapping); + bigTableRetainedVectorCopy.init( + bigTableRetainColumnMap, bigTableRetainTypeInfos); } - if (bigTableOuterKeyMapping.getCount() > 0) { - bigTableVectorCopyOuterKeys = new VectorCopyRow(); - bigTableVectorCopyOuterKeys.init(bigTableOuterKeyMapping); + if (nonOuterSmallTableKeyColumnMap.length > 0) { + nonOuterSmallTableKeyVectorCopy = new VectorCopyRow(); + nonOuterSmallTableKeyVectorCopy.init( + nonOuterSmallTableKeyColumnMap, nonOuterSmallTableKeyTypeInfos); + } + + if (outerSmallTableKeyMapping.getCount() > 0) { + outerSmallTableKeyVectorCopy = new VectorCopyRow(); + outerSmallTableKeyVectorCopy.init(outerSmallTableKeyMapping); } /* @@ -430,6 +608,7 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); overflowBatch = setupOverflowBatch(); needCommonSetup = true; + needFirstBatchSetup = true; needHashTableSetup = true; if (LOG.isDebugEnabled()) { @@ -553,29 +732,46 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); } /* - * Common one time setup by native vectorized map join operator's processOp. + * Common one time setup for Native Vector MapJoin operator. */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { + protected void commonSetup() throws HiveException { - if (LOG.isDebugEnabled()) { - LOG.debug("VectorMapJoinInnerCommonOperator commonSetup begin..."); - displayBatchColumns(batch, "batch"); - displayBatchColumns(overflowBatch, "overflowBatch"); + /* + * Make sure big table BytesColumnVectors have room for string values in the overflow batch... + */ + for (int column: bigTableByteColumnVectorColumns) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) overflowBatch.cols[column]; + bytesColumnVector.initBuffer(); } - // Make sure big table BytesColumnVectors have room for string values in the overflow batch... - for (int column: bigTableByteColumnVectorColumns) { + for (int column : nonOuterSmallTableKeyByteColumnVectorColumns) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) overflowBatch.cols[column]; + bytesColumnVector.initBuffer(); + } + + for (int column : outerSmallTableKeyByteColumnVectorColumns) { BytesColumnVector bytesColumnVector = (BytesColumnVector) overflowBatch.cols[column]; bytesColumnVector.initBuffer(); } + for (int column: smallTableByteColumnVectorColumns) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) overflowBatch.cols[column]; + bytesColumnVector.initBuffer(); + } + + batchCounter = 0; + rowCounter = 0; + } + + /* + * Common one time setup by native vectorized map join operator's first batch. + */ + public void firstBatchSetup(VectorizedRowBatch batch) throws HiveException { // Make sure small table BytesColumnVectors have room for string values in the big table and // overflow batchs... for (int column: smallTableByteColumnVectorColumns) { BytesColumnVector bytesColumnVector = (BytesColumnVector) batch.cols[column]; bytesColumnVector.initBuffer(); - bytesColumnVector = (BytesColumnVector) overflowBatch.cols[column]; - bytesColumnVector.initBuffer(); } // Setup a scratch batch that will be used to play back big table rows that were spilled @@ -583,6 +779,67 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); spillReplayBatch = VectorizedBatchUtil.makeLike(batch); } + /* + * Perform any Native Vector MapJoin operator specific hash table setup. + */ + public void hashTableSetup() throws HiveException { + } + + /* + * Perform the Native Vector MapJoin operator work. + */ + public abstract void processBatch(VectorizedRowBatch batch) throws HiveException; + + /* + * Common process method for all Native Vector MapJoin operators. + * + * Do common initialization work and invoke the override-able common setup methods. + * + * Then, invoke the processBatch override method to do the operator work. + */ + @Override + public void process(Object row, int tag) throws HiveException { + + VectorizedRowBatch batch = (VectorizedRowBatch) row; + alias = (byte) tag; + + if (needCommonSetup) { + + // Our one time process method initialization. + commonSetup(); + + needCommonSetup = false; + } + + if (needFirstBatchSetup) { + + // Our one time first-batch method initialization. + firstBatchSetup(batch); + + needFirstBatchSetup = false; + } + + if (needHashTableSetup) { + + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + hashTableSetup(); + + needHashTableSetup = false; + } + + batchCounter++; + + if (batch.size == 0) { + return; + } + + rowCounter += batch.size; + + processBatch(batch); + } + protected void displayBatchColumns(VectorizedRowBatch batch, String batchName) { LOG.debug(getLoggingPrefix() + " VectorMapJoinCommonOperator commonSetup " + batchName + " column count " + batch.numCols); for (int column = 0; column < batch.numCols; column++) {
http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterLongOperator.java new file mode 100644 index 0000000..d08c2f0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterLongOperator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * Specialized class for doing a Native Vector FULL OUTER MapJoin on a Single-Column Long + * using a hash map. + */ +public class VectorMapJoinFullOuterLongOperator extends VectorMapJoinOuterLongOperator { + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinFullOuterLongOperator.class.getName(); + // private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinFullOuterLongOperator() { + super(); + } + + public VectorMapJoinFullOuterLongOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinFullOuterLongOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + // Turn on key matching. + fullOuterHashTableSetup(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterMultiKeyOperator.java new file mode 100644 index 0000000..8d5a4c2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterMultiKeyOperator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +// import org.slf4j.Logger; +// import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * Specialized class for doing a Native Vector FULL OUTER MapJoin on a Multi-Keyg + * using a hash map. + */ +public class VectorMapJoinFullOuterMultiKeyOperator extends VectorMapJoinOuterMultiKeyOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinFullOuterMultiKeyOperator.class.getName(); + // private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinFullOuterMultiKeyOperator() { + super(); + } + + public VectorMapJoinFullOuterMultiKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinFullOuterMultiKeyOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + // Turn on key matching. + fullOuterHashTableSetup(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterStringOperator.java new file mode 100644 index 0000000..78987f0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinFullOuterStringOperator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +// import org.slf4j.Logger; +// import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * Specialized class for doing a Native Vector FULL OUTER MapJoin on a Single-Column String + * using a hash map. + */ +public class VectorMapJoinFullOuterStringOperator extends VectorMapJoinOuterStringOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterStringOperator.class.getName(); + // private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinFullOuterStringOperator() { + super(); + } + + public VectorMapJoinFullOuterStringOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinFullOuterStringOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + // Turn on key matching. + fullOuterHashTableSetup(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 3821cc6..f5bb547 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; @@ -93,9 +92,6 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC private transient Thread ownThread; private transient int interruptCheckCounter = CHECK_INTERRUPT_PER_OVERFLOW_BATCHES; - // Debug display. - protected transient long batchCounter; - /** Kryo ctor. */ protected VectorMapJoinGenerateResultOperator() { super(); @@ -124,13 +120,6 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC ownThread = Thread.currentThread(); } - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); - - batchCounter = 0; - - } - //------------------------------------------------------------------------------------------------ protected void performValueExpressions(VectorizedRowBatch batch, @@ -157,24 +146,24 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC batch.selectedInUse = saveSelectedInUse; } - protected void doSmallTableDeserializeRow(VectorizedRowBatch batch, int batchIndex, + protected void doSmallTableValueDeserializeRow(VectorizedRowBatch batch, int batchIndex, ByteSegmentRef byteSegmentRef, VectorMapJoinHashMapResult hashMapResult) throws HiveException { byte[] bytes = byteSegmentRef.getBytes(); int offset = (int) byteSegmentRef.getOffset(); int length = byteSegmentRef.getLength(); - smallTableVectorDeserializeRow.setBytes(bytes, offset, length); + smallTableValueVectorDeserializeRow.setBytes(bytes, offset, length); try { // Our hash tables are immutable. We can safely do by reference STRING, CHAR/VARCHAR, etc. - smallTableVectorDeserializeRow.deserializeByRef(batch, batchIndex); + smallTableValueVectorDeserializeRow.deserializeByRef(batch, batchIndex); } catch (Exception e) { throw new HiveException( "\nHashMapResult detail: " + hashMapResult.getDetailedHashMapResultPositionString() + "\nDeserializeRead detail: " + - smallTableVectorDeserializeRow.getDetailedReadPositionString(), + smallTableValueVectorDeserializeRow.getDetailedReadPositionString(), e); } } @@ -215,22 +204,23 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC for (int i = 0; i < duplicateCount; i++) { - int batchIndex = allMatchs[allMatchesIndex + i]; + final int batchIndex = allMatchs[allMatchesIndex + i]; - // Outer key copying is only used when we are using the input BigTable batch as the output. - // - if (bigTableVectorCopyOuterKeys != null) { - // Copy within row. - bigTableVectorCopyOuterKeys.copyByReference(batch, batchIndex, batch, batchIndex); + if (outerSmallTableKeyVectorCopy != null) { + + // For [FULL] OUTER MapJoin, copy Big Table keys to Small Table area within + // same batch by reference. + // + outerSmallTableKeyVectorCopy.copyByReference( + batch, batchIndex, + batch, batchIndex); } - if (smallTableVectorDeserializeRow != null) { - doSmallTableDeserializeRow(batch, batchIndex, + if (smallTableValueVectorDeserializeRow != null) { + doSmallTableValueDeserializeRow(batch, batchIndex, byteSegmentRef, hashMapResult); } - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, "generateHashMapResultSingleValue big table"); - // Use the big table row as output. batch.selected[numSel++] = batchIndex; } @@ -273,26 +263,45 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC for (int i = 0; i < duplicateCount; i++) { - int batchIndex = allMatchs[allMatchesIndex + i]; + final int batchIndex = allMatchs[allMatchesIndex + i]; ByteSegmentRef byteSegmentRef = hashMapResult.first(); while (byteSegmentRef != null) { // Copy the BigTable values into the overflow batch. Since the overflow batch may // not get flushed here, we must copy by value. - // Note this includes any outer join keys that need to go into the small table "area". + // if (bigTableRetainedVectorCopy != null) { - bigTableRetainedVectorCopy.copyByValue(batch, batchIndex, - overflowBatch, overflowBatch.size); + bigTableRetainedVectorCopy.copyByValue( + batch, batchIndex, + overflowBatch, overflowBatch.size); } - if (smallTableVectorDeserializeRow != null) { + if (nonOuterSmallTableKeyVectorCopy != null) { - doSmallTableDeserializeRow(overflowBatch, overflowBatch.size, - byteSegmentRef, hashMapResult); + // For non-[FULL] OUTER MapJoin, copy non-retained Big Table keys to the Big Table area + // across to overflow batch by value so Small Key projection will see its keys... + // + nonOuterSmallTableKeyVectorCopy.copyByValue( + batch, batchIndex, + overflowBatch, overflowBatch.size); } - // VectorizedBatchUtil.debugDisplayOneRow(overflowBatch, overflowBatch.size, "generateHashMapResultMultiValue overflow"); + if (outerSmallTableKeyVectorCopy != null) { + + // For [FULL] OUTER MapJoin, copy Big Table keys to Small Table area across + // to overflow batch by value. + // + outerSmallTableKeyVectorCopy.copyByValue( + batch, batchIndex, + overflowBatch, overflowBatch.size); + } + + if (smallTableValueVectorDeserializeRow != null) { + + doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, + byteSegmentRef, hashMapResult); + } overflowBatch.size++; if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { @@ -333,8 +342,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC // Fill up as much of the overflow batch as possible with small table values. while (byteSegmentRef != null) { - if (smallTableVectorDeserializeRow != null) { - doSmallTableDeserializeRow(overflowBatch, overflowBatch.size, + if (smallTableValueVectorDeserializeRow != null) { + doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, byteSegmentRef, hashMapResult); } @@ -361,9 +370,40 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int batchIndex = allMatchs[allMatchesIndex + i]; if (bigTableRetainedVectorCopy != null) { + // The one big table row's values repeat. - bigTableRetainedVectorCopy.copyByReference(batch, batchIndex, overflowBatch, 0); - for (int column : bigTableRetainedMapping.getOutputColumns()) { + bigTableRetainedVectorCopy.copyByReference( + batch, batchIndex, + overflowBatch, 0); + for (int column : bigTableRetainColumnMap) { + overflowBatch.cols[column].isRepeating = true; + } + } + + if (nonOuterSmallTableKeyVectorCopy != null) { + + // For non-[FULL] OUTER MapJoin, copy non-retained Big Table keys to the Big Table area + // across to overflow batch by value so Small Key projection will see its keys... + // + nonOuterSmallTableKeyVectorCopy.copyByValue( + batch, batchIndex, + overflowBatch, 0); + for (int column : nonOuterSmallTableKeyColumnMap) { + overflowBatch.cols[column].isRepeating = true; + } + } + + int[] outerSmallTableKeyColumnMap = null; + if (outerSmallTableKeyVectorCopy != null) { + + // For [FULL] OUTER MapJoin, copy Big Table keys to Small Table area within + // to overflow batch by value. + // + outerSmallTableKeyVectorCopy.copyByValue( + batch, batchIndex, + overflowBatch, 0); + outerSmallTableKeyColumnMap = outerSmallTableKeyMapping.getOutputColumns(); + for (int column : outerSmallTableKeyColumnMap) { overflowBatch.cols[column].isRepeating = true; } } @@ -373,10 +413,20 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC forwardOverflowNoReset(); // Hand reset the big table columns. - for (int column : bigTableRetainedMapping.getOutputColumns()) { + for (int column : bigTableRetainColumnMap) { + ColumnVector colVector = overflowBatch.cols[column]; + colVector.reset(); + } + for (int column : nonOuterSmallTableKeyColumnMap) { ColumnVector colVector = overflowBatch.cols[column]; colVector.reset(); } + if (outerSmallTableKeyColumnMap != null) { + for (int column : outerSmallTableKeyColumnMap) { + ColumnVector colVector = overflowBatch.cols[column]; + colVector.reset(); + } + } } byteSegmentRef = hashMapResult.next(); @@ -476,22 +526,16 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC } private void spillSerializeRow(VectorizedRowBatch batch, int batchIndex, - VectorMapJoinHashTableResult hashTableResult) throws IOException { - - int partitionId = hashTableResult.spillPartitionId(); + int partitionId) throws IOException { HybridHashTableContainer ht = (HybridHashTableContainer) mapJoinTables[posSingleVectorMapJoinSmallTable]; HashPartition hp = ht.getHashPartitions()[partitionId]; VectorRowBytesContainer rowBytesContainer = hp.getMatchfileRowBytesContainer(); Output output = rowBytesContainer.getOuputForRowBytes(); -// int offset = output.getLength(); bigTableVectorSerializeRow.setOutputAppend(output); bigTableVectorSerializeRow.serializeWrite(batch, batchIndex); -// int length = output.getLength() - offset; rowBytesContainer.finishRow(); - -// LOG.debug("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length); } protected void spillHashMapBatch(VectorizedRowBatch batch, @@ -509,8 +553,18 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int hashTableResultIndex = spillHashTableResultIndices[i]; VectorMapJoinHashTableResult hashTableResult = hashTableResults[hashTableResultIndex]; - spillSerializeRow(batch, batchIndex, hashTableResult); + spillSerializeRow(batch, batchIndex, hashTableResult.spillPartitionId()); + } + } + + protected void spillRow(VectorizedRowBatch batch, int batchIndex, int partitionId) + throws HiveException, IOException { + + if (bigTableVectorSerializeRow == null) { + setupSpillSerDe(batch); } + + spillSerializeRow(batch, batchIndex, partitionId); } protected void spillBatchRepeated(VectorizedRowBatch batch, @@ -525,7 +579,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC for (int logical = 0; logical < batch.size; logical++) { int batchIndex = (selectedInUse ? selected[logical] : logical); - spillSerializeRow(batch, batchIndex, hashTableResult); + spillSerializeRow(batch, batchIndex, hashTableResult.spillPartitionId()); } } @@ -541,8 +595,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC MapJoinTableContainer smallTable = spilledMapJoinTables[pos]; - vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf, - smallTable); + vectorMapJoinHashTable = + VectorMapJoinOptimizedCreateHashTable.createHashTable(conf, smallTable); needHashTableSetup = true; LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName()); @@ -637,7 +691,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC batch.projectionSize = outputProjection.length; batch.projectedColumns = outputProjection; - forward(batch, null, true); + vectorForward(batch); // Revert the projected columns back, because batch can be re-used by our parent operators. batch.projectionSize = originalProjectionSize; @@ -649,7 +703,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC * Forward the overflow batch and reset the batch. */ protected void forwardOverflow() throws HiveException { - forward(overflowBatch, null, true); + vectorForward(overflowBatch); overflowBatch.reset(); maybeCheckInterrupt(); } @@ -666,7 +720,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC * Forward the overflow batch, but do not reset the batch. */ private void forwardOverflowNoReset() throws HiveException { - forward(overflowBatch, null, true); + vectorForward(overflowBatch); } /* @@ -679,6 +733,11 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC @Override public void closeOp(boolean aborted) throws HiveException { super.closeOp(aborted); + + // NOTE: The closeOp call on super MapJoinOperator can trigger Hybrid Grace additional + // NOTE: processing and also FULL OUTER MapJoin non-match Small Table result generation. So, + // NOTE: we flush the overflowBatch after the call. + // if (!aborted && overflowBatch.size > 0) { forwardOverflow(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java index f791d95..35ddddd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java @@ -103,25 +103,25 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator /* * Setup our inner big table only join specific members. */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); // Inner big-table only join specific. VectorMapJoinHashMultiSet baseHashMultiSet = (VectorMapJoinHashMultiSet) vectorMapJoinHashTable; - hashMultiSetResults = new VectorMapJoinHashMultiSetResult[batch.DEFAULT_SIZE]; + hashMultiSetResults = new VectorMapJoinHashMultiSetResult[VectorizedRowBatch.DEFAULT_SIZE]; for (int i = 0; i < hashMultiSetResults.length; i++) { hashMultiSetResults[i] = baseHashMultiSet.createHashMultiSetResult(); } - allMatchs = new int[batch.DEFAULT_SIZE]; + allMatchs = new int[VectorizedRowBatch.DEFAULT_SIZE]; - equalKeySeriesValueCounts = new long[batch.DEFAULT_SIZE]; - equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; + equalKeySeriesValueCounts = new long[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesAllMatchIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesDuplicateCounts = new int[VectorizedRowBatch.DEFAULT_SIZE]; - spills = new int[batch.DEFAULT_SIZE]; - spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; + spills = new int[VectorizedRowBatch.DEFAULT_SIZE]; + spillHashMapResultIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; } //----------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java index 678fa42..30a19b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java @@ -102,45 +102,36 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig // @Override - public void process(Object row, int tag) throws HiveException { + protected void commonSetup() throws HiveException { + super.commonSetup(); - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); - - /* - * Initialize Single-Column Long members for this specialized class. - */ + /* + * Initialize Single-Column Long members for this specialized class. + */ - singleJoinColumn = bigTableKeyColumnMap[0]; - - needCommonSetup = false; - } - - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. - - /* - * Get our Single-Column Long hash multi-set information for this specialized class. - */ + singleJoinColumn = bigTableKeyColumnMap[0]; + } - hashMultiSet = (VectorMapJoinLongHashMultiSet) vectorMapJoinHashTable; - useMinMax = hashMultiSet.useMinMax(); - if (useMinMax) { - min = hashMultiSet.min(); - max = hashMultiSet.max(); - } + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + /* + * Get our Single-Column Long hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinLongHashMultiSet) vectorMapJoinHashTable; + useMinMax = hashMultiSet.useMinMax(); + if (useMinMax) { + min = hashMultiSet.min(); + max = hashMultiSet.max(); + } + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an inner big-only join. @@ -153,11 +144,7 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java index 866aa60..f587517 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Multi-Key hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet; @@ -109,45 +108,40 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; + protected void commonSetup() throws HiveException { + super.commonSetup(); - alias = (byte) tag; + /* + * Initialize Multi-Key members for this specialized class. + */ - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); - - /* - * Initialize Multi-Key members for this specialized class. - */ + keyVectorSerializeWrite = new VectorSerializeRow( + new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); + keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); - keyVectorSerializeWrite = new VectorSerializeRow( - new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); - keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); + currentKeyOutput = new Output(); + saveKeyOutput = new Output(); + } - currentKeyOutput = new Output(); - saveKeyOutput = new Output(); + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - needCommonSetup = false; - } + /* + * Get our Single-Column Long hash multi-set information for this specialized class. + */ - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + /* + * Get our Multi-Key hash multi-set information for this specialized class. + */ - /* - * Get our Multi-Key hash multi-set information for this specialized class. - */ - - hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an inner big-only join. @@ -160,11 +154,7 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java index a0c3b9c..e373db1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column String hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet; @@ -98,40 +97,31 @@ public class VectorMapJoinInnerBigOnlyStringOperator extends VectorMapJoinInnerB // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); - /* - * Initialize Single-Column String members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; + /* + * Initialize Single-Column String members for this specialized class. + */ - needCommonSetup = false; - } + singleJoinColumn = bigTableKeyColumnMap[0]; + } - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - /* - * Get our Single-Column String hash multi-set information for this specialized class. - */ + /* + * Get our Single-Column String hash multi-set information for this specialized class. + */ - hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an inner big-only join. @@ -144,11 +134,7 @@ public class VectorMapJoinInnerBigOnlyStringOperator extends VectorMapJoinInnerB } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java index ea2c04d..dc5d046 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java @@ -108,26 +108,26 @@ public abstract class VectorMapJoinInnerGenerateResultOperator /* * Setup our inner join specific members. */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); // Inner join specific. VectorMapJoinHashMap baseHashMap = (VectorMapJoinHashMap) vectorMapJoinHashTable; - hashMapResults = new VectorMapJoinHashMapResult[batch.DEFAULT_SIZE]; + hashMapResults = new VectorMapJoinHashMapResult[VectorizedRowBatch.DEFAULT_SIZE]; for (int i = 0; i < hashMapResults.length; i++) { hashMapResults[i] = baseHashMap.createHashMapResult(); } - allMatchs = new int[batch.DEFAULT_SIZE]; + allMatchs = new int[VectorizedRowBatch.DEFAULT_SIZE]; - equalKeySeriesHashMapResultIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesIsSingleValue = new boolean[batch.DEFAULT_SIZE]; - equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; + equalKeySeriesHashMapResultIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesAllMatchIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesIsSingleValue = new boolean[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesDuplicateCounts = new int[VectorizedRowBatch.DEFAULT_SIZE]; - spills = new int[batch.DEFAULT_SIZE]; - spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; + spills = new int[VectorizedRowBatch.DEFAULT_SIZE]; + spillHashMapResultIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; } /* @@ -142,7 +142,7 @@ public abstract class VectorMapJoinInnerGenerateResultOperator // For join operators that can generate small table results, reset their // (target) scratch columns. - for (int column : smallTableOutputVectorColumns) { + for (int column : smallTableValueColumnMap) { ColumnVector smallTableColumn = batch.cols[column]; smallTableColumn.reset(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java index 36404bc..5ac606a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column Long hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap; @@ -101,45 +100,36 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); - - /* - * Initialize Single-Column Long members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; + protected void commonSetup() throws HiveException { + super.commonSetup(); - needCommonSetup = false; - } - - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + /* + * Initialize Single-Column Long members for this specialized class. + */ - /* - * Get our Single-Column Long hash map information for this specialized class. - */ + singleJoinColumn = bigTableKeyColumnMap[0]; + } - hashMap = (VectorMapJoinLongHashMap) vectorMapJoinHashTable; - useMinMax = hashMap.useMinMax(); - if (useMinMax) { - min = hashMap.min(); - max = hashMap.max(); - } + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + /* + * Get our Single-Column Long hash map information for this specialized class. + */ + + hashMap = (VectorMapJoinLongHashMap) vectorMapJoinHashTable; + useMinMax = hashMap.useMinMax(); + if (useMinMax) { + min = hashMap.min(); + max = hashMap.max(); + } + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an inner join. @@ -151,11 +141,7 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java index 620101f..cdee3fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Multi-Key hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; @@ -107,45 +106,36 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); - /* - * Initialize Multi-Key members for this specialized class. - */ - - keyVectorSerializeWrite = new VectorSerializeRow( - new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); - keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); + /* + * Initialize Multi-Key members for this specialized class. + */ - currentKeyOutput = new Output(); - saveKeyOutput = new Output(); + keyVectorSerializeWrite = new VectorSerializeRow( + new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); + keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); - needCommonSetup = false; - } + currentKeyOutput = new Output(); + saveKeyOutput = new Output(); + } - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - /* - * Get our Multi-Key hash map information for this specialized class. - */ + /* + * Get our Multi-Key hash map information for this specialized class. + */ - hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an inner join. @@ -157,11 +147,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java index d99d514..8e6697e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column String hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; @@ -97,40 +96,31 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); - /* - * Initialize Single-Column String members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; + /* + * Initialize Single-Column String members for this specialized class. + */ - needCommonSetup = false; - } + singleJoinColumn = bigTableKeyColumnMap[0]; + } - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - /* - * Get our Single-Column String hash map information for this specialized class. - */ + /* + * Get our Single-Column String hash map information for this specialized class. + */ - hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an inner join. @@ -142,11 +132,7 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java index f68d4c4..71ec56b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java @@ -89,21 +89,21 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator /* * Setup our left semi join specific members. */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); // Semi join specific. VectorMapJoinHashSet baseHashSet = (VectorMapJoinHashSet) vectorMapJoinHashTable; - hashSetResults = new VectorMapJoinHashSetResult[batch.DEFAULT_SIZE]; + hashSetResults = new VectorMapJoinHashSetResult[VectorizedRowBatch.DEFAULT_SIZE]; for (int i = 0; i < hashSetResults.length; i++) { hashSetResults[i] = baseHashSet.createHashSetResult(); } - allMatchs = new int[batch.DEFAULT_SIZE]; + allMatchs = new int[VectorizedRowBatch.DEFAULT_SIZE]; - spills = new int[batch.DEFAULT_SIZE]; - spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; + spills = new int[VectorizedRowBatch.DEFAULT_SIZE]; + spillHashMapResultIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; } //----------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java index 4185c5b..40e7cfa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column Long hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet; @@ -102,45 +101,36 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); - - /* - * Initialize Single-Column Long members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; + protected void commonSetup() throws HiveException { + super.commonSetup(); - needCommonSetup = false; - } - - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + /* + * Initialize Single-Column Long members for this specialized class. + */ - /* - * Get our Single-Column Long hash set information for this specialized class. - */ + singleJoinColumn = bigTableKeyColumnMap[0]; + } - hashSet = (VectorMapJoinLongHashSet) vectorMapJoinHashTable; - useMinMax = hashSet.useMinMax(); - if (useMinMax) { - min = hashSet.min(); - max = hashSet.max(); - } + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + /* + * Get our Single-Column Long hash set information for this specialized class. + */ + + hashSet = (VectorMapJoinLongHashSet) vectorMapJoinHashTable; + useMinMax = hashSet.useMinMax(); + if (useMinMax) { + min = hashSet.min(); + max = hashSet.max(); + } + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an left semi join. @@ -153,11 +143,7 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java index 541e7fa..e5d9fda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Multi-Key hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet; @@ -108,45 +107,36 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); - /* - * Initialize Multi-Key members for this specialized class. - */ - - keyVectorSerializeWrite = new VectorSerializeRow( - new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); - keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); + /* + * Initialize Multi-Key members for this specialized class. + */ - currentKeyOutput = new Output(); - saveKeyOutput = new Output(); + keyVectorSerializeWrite = new VectorSerializeRow( + new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); + keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); - needCommonSetup = false; - } + currentKeyOutput = new Output(); + saveKeyOutput = new Output(); + } - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - /* - * Get our Multi-Key hash set information for this specialized class. - */ + /* + * Get our Multi-Key hash set information for this specialized class. + */ - hashSet = (VectorMapJoinBytesHashSet) vectorMapJoinHashTable; + hashSet = (VectorMapJoinBytesHashSet) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an left semi join. @@ -159,11 +149,7 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; }