http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java index 6785bce..df900a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column String hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet; @@ -98,40 +97,31 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); - /* - * Initialize Single-Column String members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; + /* + * Initialize Single-Column String members for this specialized class. + */ - needCommonSetup = false; - } + singleJoinColumn = bigTableKeyColumnMap[0]; + } - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - /* - * Get our Single-Column String hash set information for this specialized class. - */ + /* + * Get our Single-Column String hash set information for this specialized class. + */ - hashSet = (VectorMapJoinBytesHashSet) vectorMapJoinHashTable; + hashSet = (VectorMapJoinBytesHashSet) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { // Do the per-batch setup for an left semi join. @@ -144,11 +134,7 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe } final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } return; }
http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java index 2e5c568..61bcbf0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java @@ -24,13 +24,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMap; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; @@ -131,32 +137,34 @@ public abstract class VectorMapJoinOuterGenerateResultOperator /* * Setup our outer join specific members. */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); // Outer join specific. VectorMapJoinHashMap baseHashMap = (VectorMapJoinHashMap) vectorMapJoinHashTable; - hashMapResults = new VectorMapJoinHashMapResult[batch.DEFAULT_SIZE]; + hashMapResults = new VectorMapJoinHashMapResult[VectorizedRowBatch.DEFAULT_SIZE]; for (int i = 0; i < hashMapResults.length; i++) { hashMapResults[i] = baseHashMap.createHashMapResult(); } - inputSelected = new int[batch.DEFAULT_SIZE]; + inputSelected = new int[VectorizedRowBatch.DEFAULT_SIZE]; - allMatchs = new int[batch.DEFAULT_SIZE]; + allMatchs = new int[VectorizedRowBatch.DEFAULT_SIZE]; - equalKeySeriesHashMapResultIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesIsSingleValue = new boolean[batch.DEFAULT_SIZE]; - equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; + equalKeySeriesHashMapResultIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesAllMatchIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesIsSingleValue = new boolean[VectorizedRowBatch.DEFAULT_SIZE]; + equalKeySeriesDuplicateCounts = new int[VectorizedRowBatch.DEFAULT_SIZE]; - spills = new int[batch.DEFAULT_SIZE]; - spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; + spills = new int[VectorizedRowBatch.DEFAULT_SIZE]; + spillHashMapResultIndices = new int[VectorizedRowBatch.DEFAULT_SIZE]; - nonSpills = new int[batch.DEFAULT_SIZE]; - noMatchs = new int[batch.DEFAULT_SIZE]; - merged = new int[batch.DEFAULT_SIZE]; + nonSpills = new int[VectorizedRowBatch.DEFAULT_SIZE]; + noMatchs = new int[VectorizedRowBatch.DEFAULT_SIZE]; + merged = new int[VectorizedRowBatch.DEFAULT_SIZE]; + + matchTracker = null; } @@ -174,15 +182,16 @@ public abstract class VectorMapJoinOuterGenerateResultOperator // For join operators that can generate small table results, reset their // (target) scratch columns. - for (int column : smallTableOutputVectorColumns) { + for (int column : outerSmallTableKeyColumnMap) { + ColumnVector bigTableOuterKeyColumn = batch.cols[column]; + bigTableOuterKeyColumn.reset(); + } + + for (int column : smallTableValueColumnMap) { ColumnVector smallTableColumn = batch.cols[column]; smallTableColumn.reset(); } - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector bigTableOuterKeyColumn = batch.cols[column]; - bigTableOuterKeyColumn.reset(); - } } /** @@ -569,27 +578,28 @@ public abstract class VectorMapJoinOuterGenerateResultOperator protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, int noMatchSize) throws IOException, HiveException { - // Set null information in the small table results area. + // Set null information in the small table results area. - for (int i = 0; i < noMatchSize; i++) { - int batchIndex = noMatchs[i]; + for (int i = 0; i < noMatchSize; i++) { + int batchIndex = noMatchs[i]; - // Mark any scratch small table scratch columns that would normally receive a copy of the - // key as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } + // Mark any scratch small table scratch columns that would normally receive a copy of the + // key as null, too. + // + for (int column : outerSmallTableKeyColumnMap) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[batchIndex] = true; + } - // Small table values are set to null. - for (int column : smallTableOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - } - } + // Small table values are set to null. + for (int column : smallTableValueColumnMap) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[batchIndex] = true; + } + } + } /** * Generate the outer join output results for one vectorized row batch with a repeated key. @@ -734,20 +744,310 @@ public abstract class VectorMapJoinOuterGenerateResultOperator */ protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { - for (int column : smallTableOutputVectorColumns) { + // Mark any scratch small table scratch columns that would normally receive a copy of the + // key as null, too. + // + for (int column : outerSmallTableKeyColumnMap) { ColumnVector colVector = batch.cols[column]; colVector.noNulls = false; colVector.isNull[0] = true; colVector.isRepeating = true; } - // Mark any scratch small table scratch columns that would normally receive a copy of the key - // as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { + for (int column : smallTableValueColumnMap) { ColumnVector colVector = batch.cols[column]; colVector.noNulls = false; colVector.isNull[0] = true; colVector.isRepeating = true; } } + + private void markBigTableColumnsAsNullRepeating() { + + /* + * For non-match FULL OUTER Small Table results, the Big Table columns are all NULL. + */ + for (int column : bigTableRetainColumnMap) { + ColumnVector colVector = overflowBatch.cols[column]; + colVector.isRepeating = true; + colVector.noNulls = false; + colVector.isNull[0] = true; + } + } + + /* + * For FULL OUTER MapJoin, find the non matched Small Table keys and values and odd them to the + * join output result. + */ + @Override + protected void generateFullOuterSmallTableNoMatches(byte smallTablePos, + MapJoinTableContainer substituteSmallTable) throws HiveException { + + /* + * For dynamic partition hash join, both the Big Table and Small Table are partitioned (sent) + * to the Reducer using the key hash code. So, we can generate the non-match Small Table + * results locally. + * + * Scan the Small Table for keys that didn't match and generate the non-matchs into the + * overflowBatch. + */ + + /* + * If there were no matched keys sent, we need to do our common initialization. + */ + if (needCommonSetup) { + + // Our one time process method initialization. + commonSetup(); + + needCommonSetup = false; + } + + if (needHashTableSetup) { + + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + hashTableSetup(); + + needHashTableSetup = false; + } + + /* + * To support fancy NULL repeating columns, let's flush the overflowBatch if it has anything. + */ + if (overflowBatch.size > 0) { + forwardOverflow(); + } + markBigTableColumnsAsNullRepeating(); + + switch (hashTableKeyType) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + generateFullOuterLongKeySmallTableNoMatches(); + break; + case STRING: + generateFullOuterStringKeySmallTableNoMatches(); + break; + case MULTI_KEY: + generateFullOuterMultiKeySmallTableNoMatches(); + break; + default: + throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType); + } + } + + /* + * For FULL OUTER MapJoin, find the non matched Small Table Long keys and values and odd them to + * the join output result. + */ + protected void generateFullOuterLongKeySmallTableNoMatches() + throws HiveException { + + final LongColumnVector singleSmallTableKeyOutputColumnVector; + if (allSmallTableKeyColumnIncluded[0]) { + singleSmallTableKeyOutputColumnVector = + (LongColumnVector) overflowBatch.cols[allSmallTableKeyColumnNums[0]]; + } else { + singleSmallTableKeyOutputColumnVector = null; + } + + VectorMapJoinLongHashMap hashMap = (VectorMapJoinLongHashMap) vectorMapJoinHashTable; + + VectorMapJoinNonMatchedIterator nonMatchedIterator = + hashMap.createNonMatchedIterator(matchTracker); + nonMatchedIterator.init(); + while (nonMatchedIterator.findNextNonMatched()) { + + final long longKey; + boolean isKeyNull = !nonMatchedIterator.readNonMatchedLongKey(); + if (!isKeyNull) { + longKey = nonMatchedIterator.getNonMatchedLongKey(); + } else { + longKey = 0; + } + + VectorMapJoinHashMapResult hashMapResult = nonMatchedIterator.getNonMatchedHashMapResult(); + + ByteSegmentRef byteSegmentRef = hashMapResult.first(); + while (byteSegmentRef != null) { + + // NOTE: Big Table result columns were marked repeating NULL already. + + if (singleSmallTableKeyOutputColumnVector != null) { + if (isKeyNull) { + singleSmallTableKeyOutputColumnVector.isNull[overflowBatch.size] = true; + singleSmallTableKeyOutputColumnVector.noNulls = false; + } else { + singleSmallTableKeyOutputColumnVector.vector[overflowBatch.size] = longKey; + singleSmallTableKeyOutputColumnVector.isNull[overflowBatch.size] = false; + } + } + + if (smallTableValueVectorDeserializeRow != null) { + + doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, + byteSegmentRef, hashMapResult); + } + + overflowBatch.size++; + if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { + forwardOverflow(); + markBigTableColumnsAsNullRepeating(); + } + byteSegmentRef = hashMapResult.next(); + } + } + } + + private void doSmallTableKeyDeserializeRow(VectorizedRowBatch batch, int batchIndex, + byte[] keyBytes, int keyOffset, int keyLength) + throws HiveException { + + smallTableKeyOuterVectorDeserializeRow.setBytes(keyBytes, keyOffset, keyLength); + + try { + // Our hash tables are immutable. We can safely do by reference STRING, CHAR/VARCHAR, etc. + smallTableKeyOuterVectorDeserializeRow.deserializeByRef(batch, batchIndex); + } catch (Exception e) { + throw new HiveException( + "\nDeserializeRead detail: " + + smallTableKeyOuterVectorDeserializeRow.getDetailedReadPositionString(), + e); + } + } + + /* + * For FULL OUTER MapJoin, find the non matched Small Table Multi-Keys and values and odd them to + * the join output result. + */ + protected void generateFullOuterMultiKeySmallTableNoMatches() throws HiveException { + + VectorMapJoinBytesHashMap hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + + VectorMapJoinNonMatchedIterator nonMatchedIterator = + hashMap.createNonMatchedIterator(matchTracker); + nonMatchedIterator.init(); + while (nonMatchedIterator.findNextNonMatched()) { + + nonMatchedIterator.readNonMatchedBytesKey(); + byte[] keyBytes = nonMatchedIterator.getNonMatchedBytes(); + final int keyOffset = nonMatchedIterator.getNonMatchedBytesOffset(); + final int keyLength = nonMatchedIterator.getNonMatchedBytesLength(); + + VectorMapJoinHashMapResult hashMapResult = nonMatchedIterator.getNonMatchedHashMapResult(); + + ByteSegmentRef byteSegmentRef = hashMapResult.first(); + while (byteSegmentRef != null) { + + // NOTE: Big Table result columns were marked repeating NULL already. + + if (smallTableKeyOuterVectorDeserializeRow != null) { + doSmallTableKeyDeserializeRow(overflowBatch, overflowBatch.size, + keyBytes, keyOffset, keyLength); + } + + if (smallTableValueVectorDeserializeRow != null) { + + doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, + byteSegmentRef, hashMapResult); + } + + overflowBatch.size++; + if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { + forwardOverflow(); + markBigTableColumnsAsNullRepeating(); + } + byteSegmentRef = hashMapResult.next(); + } + } + + // NOTE: We don't have to deal with FULL OUTER All-NULL key values like we do for single-column + // LONG and STRING because we do store them in the hash map... + } + + /* + * For FULL OUTER MapJoin, find the non matched Small Table String keys and values and odd them to + * the join output result. + */ + protected void generateFullOuterStringKeySmallTableNoMatches() throws HiveException { + + final BytesColumnVector singleSmallTableKeyOutputColumnVector; + if (allSmallTableKeyColumnIncluded[0]) { + singleSmallTableKeyOutputColumnVector = + (BytesColumnVector) overflowBatch.cols[allSmallTableKeyColumnNums[0]]; + } else { + singleSmallTableKeyOutputColumnVector = null; + } + + VectorMapJoinBytesHashMap hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + + VectorMapJoinNonMatchedIterator nonMatchedIterator = + hashMap.createNonMatchedIterator(matchTracker); + nonMatchedIterator.init(); + while (nonMatchedIterator.findNextNonMatched()) { + + final byte[] keyBytes; + final int keyOffset; + final int keyLength; + boolean isKeyNull = !nonMatchedIterator.readNonMatchedBytesKey(); + if (!isKeyNull) { + keyBytes = nonMatchedIterator.getNonMatchedBytes(); + keyOffset = nonMatchedIterator.getNonMatchedBytesOffset(); + keyLength = nonMatchedIterator.getNonMatchedBytesLength(); + } else { + keyBytes = null; + keyOffset = 0; + keyLength = 0; + } + + VectorMapJoinHashMapResult hashMapResult = nonMatchedIterator.getNonMatchedHashMapResult(); + + ByteSegmentRef byteSegmentRef = hashMapResult.first(); + while (byteSegmentRef != null) { + + // NOTE: Big Table result columns were marked repeating NULL already. + + if (singleSmallTableKeyOutputColumnVector != null) { + if (isKeyNull) { + singleSmallTableKeyOutputColumnVector.isNull[overflowBatch.size] = true; + singleSmallTableKeyOutputColumnVector.noNulls = false; + } else { + singleSmallTableKeyOutputColumnVector.setVal( + overflowBatch.size, + keyBytes, keyOffset, keyLength); + singleSmallTableKeyOutputColumnVector.isNull[overflowBatch.size] = false; + } + } + + if (smallTableValueVectorDeserializeRow != null) { + + doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, + byteSegmentRef, hashMapResult); + } + + overflowBatch.size++; + if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { + forwardOverflow(); + markBigTableColumnsAsNullRepeating(); + } + byteSegmentRef = hashMapResult.next(); + } + } + } + + protected void fullOuterHashTableSetup() { + + // Always track key matches for FULL OUTER. + matchTracker = vectorMapJoinHashTable.createMatchTracker(); + + } + + protected void fullOuterIntersectHashTableSetup() { + + matchTracker = vectorMapJoinHashTable.createMatchTracker(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java index be05cc2..f3f5a36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column Long hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap; @@ -65,7 +64,7 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe //--------------------------------------------------------------------------- // The hash map for this specialized class. - private transient VectorMapJoinLongHashMap hashMap; + protected transient VectorMapJoinLongHashMap hashMap; //--------------------------------------------------------------------------- // Single-Column Long specific members. @@ -77,7 +76,7 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe private transient long max; // The column number for this one column join specialization. - private transient int singleJoinColumn; + protected transient int singleJoinColumn; //--------------------------------------------------------------------------- // Pass-thru constructors. @@ -102,55 +101,39 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; - - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + protected void commonSetup() throws HiveException { + super.commonSetup(); - /* - * Initialize Single-Column Long members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; - - needCommonSetup = false; - } + /* + * Initialize Single-Column Long members for this specialized class. + */ - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. - - /* - * Get our Single-Column Long hash map information for this specialized class. - */ + singleJoinColumn = bigTableKeyColumnMap[0]; + } - hashMap = (VectorMapJoinLongHashMap) vectorMapJoinHashTable; - useMinMax = hashMap.useMinMax(); - if (useMinMax) { - min = hashMap.min(); - max = hashMap.max(); - } + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); + + /* + * Get our Single-Column Long hash map information for this specialized class. + */ + + hashMap = (VectorMapJoinLongHashMap) vectorMapJoinHashTable; + useMinMax = hashMap.useMinMax(); + if (useMinMax) { + min = hashMap.min(); + max = hashMap.max(); + } + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } - return; - } - // Do the per-batch setup for an outer join. outerPerBatchSetup(batch); @@ -160,9 +143,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe // later. boolean inputSelectedInUse = batch.selectedInUse; if (inputSelectedInUse) { - // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { - // throw new HiveException("batch.selected is not in sort order and unique"); - // } System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); } @@ -174,19 +154,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe ve.evaluate(batch); } someRowsFilteredOut = (batch.size != inputLogicalSize); - if (LOG.isDebugEnabled()) { - if (batch.selectedInUse) { - if (inputSelectedInUse) { - LOG.debug(CLASS_NAME + - " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + - " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); - } else { - LOG.debug(CLASS_NAME + - " inputLogicalSize " + inputLogicalSize + - " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); - } - } - } } // Perform any key expressions. Results will go into scratch columns. @@ -234,12 +201,11 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe } else { // Handle *repeated* join key, if found. long key = vector[0]; - // LOG.debug(CLASS_NAME + " repeated key " + key); if (useMinMax && (key < min || key > max)) { // Out of range for whole batch. joinResult = JoinUtil.JoinResult.NOMATCH; } else { - joinResult = hashMap.lookup(key, hashMapResults[0]); + joinResult = hashMap.lookup(key, hashMapResults[0], matchTracker); } } @@ -247,9 +213,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe * Common repeated join result processing. */ - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); - } finishOuterRepeated(batch, joinResult, hashMapResults[0], someRowsFilteredOut, inputSelectedInUse, inputLogicalSize); } else { @@ -258,10 +221,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe * NOT Repeating. */ - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); - } - int selected[] = batch.selected; boolean selectedInUse = batch.selectedInUse; @@ -286,8 +245,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe for (int logical = 0; logical < batch.size; logical++) { int batchIndex = (selectedInUse ? selected[logical] : logical); - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); - /* * Single-Column Long outer null detection. */ @@ -305,7 +262,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe atLeastOneNonMatch = true; - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); } else { /* @@ -354,11 +310,10 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe // Key out of range for whole hash table. saveJoinResult = JoinUtil.JoinResult.NOMATCH; } else { - saveJoinResult = hashMap.lookup(currentKey, hashMapResults[hashMapResultCount]); + saveJoinResult = hashMap.lookup(currentKey, hashMapResults[hashMapResultCount], + matchTracker); } - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " New Key " + currentKey + " " + saveJoinResult.name()); - /* * Common outer join result processing. */ @@ -370,7 +325,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe equalKeySeriesIsSingleValue[equalKeySeriesCount] = hashMapResults[hashMapResultCount].isSingleRow(); equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; allMatchs[allMatchCount++] = batchIndex; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); break; case SPILL: @@ -381,11 +335,9 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe case NOMATCH: atLeastOneNonMatch = true; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); break; } } else { - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); // Series of equal keys. @@ -393,7 +345,6 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe case MATCH: equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; allMatchs[allMatchCount++] = batchIndex; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); break; case SPILL: @@ -403,13 +354,9 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe break; case NOMATCH: - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); break; } } - // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { - // throw new HiveException("allMatchs is not in sort order and unique"); - // } } } @@ -451,7 +398,9 @@ public class VectorMapJoinOuterLongOperator extends VectorMapJoinOuterGenerateRe } if (batch.size > 0) { - // Forward any remaining selected rows. + + // Forward any rows in the Big Table batch that had results added (they will be selected). + // NOTE: Other result rows may have been generated in the overflowBatch. forwardBigTableBatch(batch); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java index 70f88e3..29c531b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Multi-Key hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; @@ -69,17 +68,17 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera //--------------------------------------------------------------------------- // The hash map for this specialized class. - private transient VectorMapJoinBytesHashMap hashMap; + protected transient VectorMapJoinBytesHashMap hashMap; //--------------------------------------------------------------------------- // Multi-Key specific members. // // Object that can take a set of columns in row in a vectorized row batch and serialized it. - private transient VectorSerializeRow keyVectorSerializeWrite; + protected transient VectorSerializeRow keyVectorSerializeWrite; // The BinarySortable serialization of the current key. - private transient Output currentKeyOutput; + protected transient Output currentKeyOutput; // The BinarySortable serialization of the saved key for a possible series of equal keys. private transient Output saveKeyOutput; @@ -107,55 +106,40 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; + protected void commonSetup() throws HiveException { + super.commonSetup(); - alias = (byte) tag; + /* + * Initialize Multi-Key members for this specialized class. + */ - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + keyVectorSerializeWrite = new VectorSerializeRow( + new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); + keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); - /* - * Initialize Multi-Key members for this specialized class. - */ - - keyVectorSerializeWrite = new VectorSerializeRow( - new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); - keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); - - currentKeyOutput = new Output(); - saveKeyOutput = new Output(); + currentKeyOutput = new Output(); + saveKeyOutput = new Output(); + } - needCommonSetup = false; - } + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + /* + * Get our Multi-Key hash map information for this specialized class. + */ - /* - * Get our Multi-Key hash map information for this specialized class. - */ + hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; - hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } - return; - } - // Do the per-batch setup for an outer join. outerPerBatchSetup(batch); @@ -165,9 +149,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera // later. boolean inputSelectedInUse = batch.selectedInUse; if (inputSelectedInUse) { - // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { - // throw new HiveException("batch.selected is not in sort order and unique"); - // } System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); } @@ -179,19 +160,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera ve.evaluate(batch); } someRowsFilteredOut = (batch.size != inputLogicalSize); - if (LOG.isDebugEnabled()) { - if (batch.selectedInUse) { - if (inputSelectedInUse) { - LOG.debug(CLASS_NAME + - " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + - " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); - } else { - LOG.debug(CLASS_NAME + - " inputLogicalSize " + inputLogicalSize + - " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); - } - } - } } // Perform any key expressions. Results will go into scratch columns. @@ -259,16 +227,13 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera keyVectorSerializeWrite.serializeWrite(batch, 0); byte[] keyBytes = currentKeyOutput.getData(); int keyLength = currentKeyOutput.getLength(); - joinResult = hashMap.lookup(keyBytes, 0, keyLength, hashMapResults[0]); + joinResult = hashMap.lookup(keyBytes, 0, keyLength, hashMapResults[0], matchTracker); } /* * Common repeated join result processing. */ - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); - } finishOuterRepeated(batch, joinResult, hashMapResults[0], someRowsFilteredOut, inputSelectedInUse, inputLogicalSize); } else { @@ -277,10 +242,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera * NOT Repeating. */ - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); - } - int selected[] = batch.selected; boolean selectedInUse = batch.selectedInUse; @@ -305,8 +266,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera for (int logical = 0; logical < batch.size; logical++) { int batchIndex = (selectedInUse ? selected[logical] : logical); - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); - /* * Multi-Key outer null detection. */ @@ -325,7 +284,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera atLeastOneNonMatch = true; - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); } else { /* @@ -375,7 +333,9 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera byte[] keyBytes = saveKeyOutput.getData(); int keyLength = saveKeyOutput.getLength(); - saveJoinResult = hashMap.lookup(keyBytes, 0, keyLength, hashMapResults[hashMapResultCount]); + saveJoinResult = hashMap.lookup(keyBytes, 0, keyLength, + hashMapResults[hashMapResultCount], matchTracker); + /* * Common outer join result processing. @@ -388,7 +348,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera equalKeySeriesIsSingleValue[equalKeySeriesCount] = hashMapResults[hashMapResultCount].isSingleRow(); equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; allMatchs[allMatchCount++] = batchIndex; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); break; case SPILL: @@ -399,11 +358,9 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera case NOMATCH: atLeastOneNonMatch = true; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); break; } } else { - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); // Series of equal keys. @@ -411,7 +368,6 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera case MATCH: equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; allMatchs[allMatchCount++] = batchIndex; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); break; case SPILL: @@ -421,13 +377,9 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera break; case NOMATCH: - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); break; } } - // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { - // throw new HiveException("allMatchs is not in sort order and unique"); - // } } } @@ -469,7 +421,9 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera } if (batch.size > 0) { - // Forward any remaining selected rows. + + // Forward any rows in the Big Table batch that had results added (they will be selected). + // NOTE: Other result rows may have been generated in the overflowBatch. forwardBigTableBatch(batch); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java index 714f5ec..a19941a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; - import org.apache.hadoop.hive.ql.plan.VectorDesc; // Single-Column String hash table import. import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; @@ -65,14 +64,14 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate //--------------------------------------------------------------------------- // The hash map for this specialized class. - private transient VectorMapJoinBytesHashMap hashMap; + protected transient VectorMapJoinBytesHashMap hashMap; //--------------------------------------------------------------------------- // Single-Column String specific members. // // The column number for this one column join specialization. - private transient int singleJoinColumn; + protected transient int singleJoinColumn; //--------------------------------------------------------------------------- // Pass-thru constructors. @@ -97,50 +96,35 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate // @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - alias = (byte) tag; + protected void commonSetup() throws HiveException { + super.commonSetup(); - if (needCommonSetup) { - // Our one time process method initialization. - commonSetup(batch); + /* + * Initialize Single-Column String members for this specialized class. + */ - /* - * Initialize Single-Column String members for this specialized class. - */ - - singleJoinColumn = bigTableKeyColumnMap[0]; + singleJoinColumn = bigTableKeyColumnMap[0]; + } - needCommonSetup = false; - } + @Override + public void hashTableSetup() throws HiveException { + super.hashTableSetup(); - if (needHashTableSetup) { - // Setup our hash table specialization. It will be the first time the process - // method is called, or after a Hybrid Grace reload. + /* + * Get our Single-Column String hash map information for this specialized class. + */ - /* - * Get our Single-Column String hash map information for this specialized class. - */ + hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; - hashMap = (VectorMapJoinBytesHashMap) vectorMapJoinHashTable; + } - needHashTableSetup = false; - } + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { - batchCounter++; + try { final int inputLogicalSize = batch.size; - if (inputLogicalSize == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } - return; - } - // Do the per-batch setup for an outer join. outerPerBatchSetup(batch); @@ -150,33 +134,17 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate // later. boolean inputSelectedInUse = batch.selectedInUse; if (inputSelectedInUse) { - // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { - // throw new HiveException("batch.selected is not in sort order and unique"); - // } System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); } // Filtering for outer join just removes rows available for hash table matching. - boolean someRowsFilteredOut = false; + boolean someRowsFilteredOut = false; if (bigTableFilterExpressions.length > 0) { // Since the input for (VectorExpression ve : bigTableFilterExpressions) { ve.evaluate(batch); } someRowsFilteredOut = (batch.size != inputLogicalSize); - if (LOG.isDebugEnabled()) { - if (batch.selectedInUse) { - if (inputSelectedInUse) { - LOG.debug(CLASS_NAME + - " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + - " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); - } else { - LOG.debug(CLASS_NAME + - " inputLogicalSize " + inputLogicalSize + - " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); - } - } - } } // Perform any key expressions. Results will go into scratch columns. @@ -228,7 +196,8 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate byte[] keyBytes = vector[0]; int keyStart = start[0]; int keyLength = length[0]; - joinResult = hashMap.lookup(keyBytes, keyStart, keyLength, hashMapResults[0]); + joinResult = hashMap.lookup( + keyBytes, keyStart, keyLength, hashMapResults[0], matchTracker); } /* @@ -246,10 +215,6 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate * NOT Repeating. */ - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); - } - int selected[] = batch.selected; boolean selectedInUse = batch.selectedInUse; @@ -274,8 +239,6 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate for (int logical = 0; logical < batch.size; logical++) { int batchIndex = (selectedInUse ? selected[logical] : logical); - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); - /* * Single-Column String outer null detection. */ @@ -293,7 +256,6 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate atLeastOneNonMatch = true; - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); } else { /* @@ -343,7 +305,8 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate byte[] keyBytes = vector[batchIndex]; int keyStart = start[batchIndex]; int keyLength = length[batchIndex]; - saveJoinResult = hashMap.lookup(keyBytes, keyStart, keyLength, hashMapResults[hashMapResultCount]); + saveJoinResult = hashMap.lookup(keyBytes, keyStart, keyLength, + hashMapResults[hashMapResultCount], matchTracker); /* * Common outer join result processing. @@ -356,7 +319,6 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate equalKeySeriesIsSingleValue[equalKeySeriesCount] = hashMapResults[hashMapResultCount].isSingleRow(); equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; allMatchs[allMatchCount++] = batchIndex; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); break; case SPILL: @@ -367,11 +329,9 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate case NOMATCH: atLeastOneNonMatch = true; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); break; } } else { - // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); // Series of equal keys. @@ -379,7 +339,6 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate case MATCH: equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; allMatchs[allMatchCount++] = batchIndex; - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); break; case SPILL: @@ -389,13 +348,9 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate break; case NOMATCH: - // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); break; } } - // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { - // throw new HiveException("allMatchs is not in sort order and unique"); - // } } } @@ -437,7 +392,9 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate } if (batch.size > 0) { - // Forward any remaining selected rows. + + // Forward any rows in the Big Table batch that had results added (they will be selected). + // NOTE: Other result rows may have been generated in the overflowBatch. forwardBigTableBatch(batch); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java index 5969460..add8b9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java @@ -19,8 +19,13 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.WriteBuffers; +import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef; import org.apache.hadoop.io.BytesWritable; import org.apache.hive.common.util.HashCodeUtil; import org.slf4j.Logger; @@ -41,11 +46,112 @@ public abstract class VectorMapJoinFastBytesHashMap protected BytesWritable testValueBytesWritable; + private long fullOuterNullKeyRefWord; + + private static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator { + + private VectorMapJoinFastBytesHashMap hashMap; + + private boolean noMore; + private boolean keyIsNull; + + private WriteBuffers.Position nonMatchedReadPos; + + private ByteSegmentRef nonMatchedKeyByteSegmentRef; + + private VectorMapJoinFastBytesHashMapStore.HashMapResult nonMatchedHashMapResult; + + NonMatchedBytesHashMapIterator(MatchTracker matchTracker, + VectorMapJoinFastBytesHashMap hashMap) { + super(matchTracker); + this.hashMap = hashMap; + } + + @Override + public void init() { + super.init(); + noMore = false; + keyIsNull = false; + nonMatchedReadPos = new WriteBuffers.Position(); + nonMatchedKeyByteSegmentRef = new ByteSegmentRef(); + nonMatchedHashMapResult = new VectorMapJoinFastBytesHashMapStore.HashMapResult(); + } + + @Override + public boolean findNextNonMatched() { + if (noMore) { + return false; + } + while (true) { + nonMatchedLogicalSlotNum++; + if (nonMatchedLogicalSlotNum >= hashMap.logicalHashBucketCount) { + + // Fall below and handle Small Table NULL key. + break; + } + final long refWord = hashMap.slots[nonMatchedLogicalSlotNum]; + if (refWord != 0) { + if (!matchTracker.wasMatched(nonMatchedLogicalSlotNum)) { + nonMatchedHashMapResult.set(hashMap.hashMapStore, refWord); + keyIsNull = false; + return true; + } + } + } + + // Do we have a Small Table NULL Key? + if (hashMap.fullOuterNullKeyRefWord == 0) { + return false; + } + nonMatchedHashMapResult.set(hashMap.hashMapStore, hashMap.fullOuterNullKeyRefWord); + noMore = true; + keyIsNull = true; + return true; + } + + @Override + public boolean readNonMatchedBytesKey() throws HiveException { + if (keyIsNull) { + return false; + } + hashMap.hashMapStore.getKey( + hashMap.slots[nonMatchedLogicalSlotNum], + nonMatchedKeyByteSegmentRef, + nonMatchedReadPos); + return true; + } + + @Override + public byte[] getNonMatchedBytes() { + return nonMatchedKeyByteSegmentRef.getBytes(); + } + + @Override + public int getNonMatchedBytesOffset() { + return (int) nonMatchedKeyByteSegmentRef.getOffset(); + } + + @Override + public int getNonMatchedBytesLength() { + return nonMatchedKeyByteSegmentRef.getLength(); + } + + @Override + public VectorMapJoinHashMapResult getNonMatchedHashMapResult() { + return nonMatchedHashMapResult; + } + } + @Override public VectorMapJoinHashMapResult createHashMapResult() { return new VectorMapJoinFastBytesHashMapStore.HashMapResult(); } + @Override + public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) { + return new NonMatchedBytesHashMapIterator(matchTracker, this); + } + public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue) { if (resizeThreshold <= keysAssigned) { @@ -123,7 +229,28 @@ public abstract class VectorMapJoinFastBytesHashMap return fastHashMapResult.joinResult(); } - protected final void doHashMapMatch( + @Override + public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int keyLength, + VectorMapJoinHashMapResult hashMapResult, MatchTracker matchTracker) { + + VectorMapJoinFastBytesHashMapStore.HashMapResult fastHashMapResult = + (VectorMapJoinFastBytesHashMapStore.HashMapResult) hashMapResult; + + fastHashMapResult.forget(); + + long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength); + + final int slot = + doHashMapMatch( + keyBytes, keyStart, keyLength, hashCode, fastHashMapResult); + if (slot != -1 && matchTracker != null) { + matchTracker.trackMatch(slot); + } + + return fastHashMapResult.joinResult(); + } + + protected final int doHashMapMatch( byte[] keyBytes, int keyStart, int keyLength, long hashCode, VectorMapJoinFastBytesHashMapStore.HashMapResult fastHashMapResult) { @@ -138,7 +265,7 @@ public abstract class VectorMapJoinFastBytesHashMap if (refWord == 0) { // Given that we do not delete, an empty slot means no match. - return; + return -1; } else if ( VectorMapJoinFastBytesHashKeyRef.getPartialHashCodeFromRefWord(refWord) == partialHashCode) { @@ -148,22 +275,47 @@ public abstract class VectorMapJoinFastBytesHashMap fastHashMapResult.setKey(hashMapStore, refWord); if (fastHashMapResult.equalKey(keyBytes, keyStart, keyLength)) { fastHashMapResult.setMatch(); - return; + return slot; } } // Some other key (collision) - keep probing. probeSlot += (++i); if (i > largestNumberOfSteps) { // We know we never went that far when we were inserting. - return; + return -1; } slot = (int) (probeSlot & logicalHashBucketMask); } } + private static final byte[] EMPTY_BYTES = new byte[0]; + + public void addFullOuterNullKeyValue(BytesWritable currentValue) { + + byte[] valueBytes = currentValue.getBytes(); + int valueLength = currentValue.getLength(); + + if (fullOuterNullKeyRefWord == 0) { + fullOuterNullKeyRefWord = + hashMapStore.addFirst( + /* partialHashCode */ 0, EMPTY_BYTES, 0, 0, + valueBytes, 0, valueLength); + } else { + + // Add another value. + fullOuterNullKeyRefWord = + hashMapStore.addMore( + fullOuterNullKeyRefWord, valueBytes, 0, valueLength, unsafeReadPos); + } + } + public VectorMapJoinFastBytesHashMap( + boolean isFullOuter, int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + fullOuterNullKeyRefWord = 0; hashMapStore = new VectorMapJoinFastBytesHashMapStore(writeBuffersSize); writeBuffers = hashMapStore.getWriteBuffers(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java index dda4a85..b71ebb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMapStore.java @@ -205,8 +205,7 @@ public class VectorMapJoinFastBytesHashMapStore implements MemoryEstimate { /** * Setup for reading the key of an entry with the equalKey method. * @param hashMapStore - * @param part1Word - * @param part2Word + * @param refWord */ public void setKey(VectorMapJoinFastBytesHashMapStore hashMapStore, long refWord) { @@ -280,6 +279,16 @@ public class VectorMapJoinFastBytesHashMapStore implements MemoryEstimate { setJoinResult(JoinResult.MATCH); } + /** + * Setup for a match outright. + * @param hashMapStore + * @param refWord + */ + public void set(VectorMapJoinFastBytesHashMapStore hashMapStore, long refWord) { + setKey(hashMapStore, refWord); + setMatch(); + } + @Override public boolean hasRows() { return hasRows; @@ -546,6 +555,23 @@ public class VectorMapJoinFastBytesHashMapStore implements MemoryEstimate { return refWord; } + public void getKey(long refWord, ByteSegmentRef keyByteSegmentRef, + WriteBuffers.Position readPos) { + + final long absoluteOffset = KeyRef.getAbsoluteOffset(refWord); + + writeBuffers.setReadPoint(absoluteOffset, readPos); + + int keyLength = KeyRef.getSmallKeyLength(refWord); + boolean isKeyLengthSmall = (keyLength != KeyRef.SmallKeyLength.allBitsOn); + if (!isKeyLengthSmall) { + + // Read big key length we wrote with the key. + keyLength = writeBuffers.readVInt(readPos); + } + writeBuffers.getByteSegmentRefToCurrent(keyByteSegmentRef, keyLength, readPos); + } + public VectorMapJoinFastBytesHashMapStore(int writeBuffersSize) { writeBuffers = new WriteBuffers(writeBuffersSize, KeyRef.AbsoluteOffset.maxSize); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java index 849eeb4..5ec90b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java @@ -162,8 +162,11 @@ public abstract class VectorMapJoinFastBytesHashMultiSet } public VectorMapJoinFastBytesHashMultiSet( + boolean isFullOuter, int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); hashMultiSetStore = new VectorMapJoinFastBytesHashMultiSetStore(writeBuffersSize); writeBuffers = hashMultiSetStore.getWriteBuffers(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java index 737b4d0..7c73aa6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java @@ -153,8 +153,11 @@ public abstract class VectorMapJoinFastBytesHashSet } public VectorMapJoinFastBytesHashSet( + boolean isFullOuter, int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); hashSetStore = new VectorMapJoinFastBytesHashSetStore(writeBuffersSize); writeBuffers = hashSetStore.getWriteBuffers(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java index 223eec3..3d45a54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java @@ -128,8 +128,11 @@ public abstract class VectorMapJoinFastBytesHashTable } public VectorMapJoinFastBytesHashTable( - int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + boolean isFullOuter, + int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); unsafeReadPos = new WriteBuffers.Position(); allocateBucketArray(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java index 3e91667..806e075 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMap.java @@ -31,8 +31,10 @@ public abstract class VectorMapJoinFastHashMap } public VectorMapJoinFastHashMap( - boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + boolean isFullOuter, + int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java index fa3d548..338a360 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashMultiSet.java @@ -41,8 +41,10 @@ public abstract class VectorMapJoinFastHashMultiSet } public VectorMapJoinFastHashMultiSet( - boolean isOuterJoin, - int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + boolean isFullOuter, + int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java index 4f90564..9fdcad7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashSet.java @@ -37,8 +37,10 @@ public abstract class VectorMapJoinFastHashSet } public VectorMapJoinFastHashSet( - boolean isOuterJoin, + boolean isFullOuter, int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + super( + isFullOuter, + initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index cbcc9b1..2d05eab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -22,15 +22,19 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; +import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator; public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTable { public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTable.class); + protected final boolean isFullOuter; + protected int logicalHashBucketCount; protected int logicalHashBucketMask; - protected float loadFactor; + protected final float loadFactor; protected final int writeBuffersSize; protected long estimatedKeyCount; @@ -69,7 +73,10 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab } public VectorMapJoinFastHashTable( - int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { + boolean isFullOuter, + int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { + + this.isFullOuter = isFullOuter; initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); @@ -93,7 +100,27 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab @Override public long getEstimatedMemorySize() { + int size = 0; JavaDataModel jdm = JavaDataModel.get(); - return JavaDataModel.alignUp(10L * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign()); + size += JavaDataModel.alignUp(10L * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign()); + if (isFullOuter) { + size += MatchTracker.calculateEstimatedMemorySize(logicalHashBucketCount); + } + return size; + } + + @Override + public MatchTracker createMatchTracker() { + return MatchTracker.create(logicalHashBucketCount); + } + + @Override + public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) { + throw new RuntimeException("Not implemented"); + } + + @Override + public int spillPartitionId() { + throw new RuntimeException("Not implemented"); } } http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java index b6684e0..a7e9739 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde2.WriteBuffers; +import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef; // Optimized for sequential key lookup. @@ -124,13 +125,11 @@ public class VectorMapJoinFastKeyStore implements MemoryEstimate { public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength, WriteBuffers.Position readPos) { - int storedKeyLengthLength = + int storedKeyLength = (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift); - boolean isKeyLengthSmall = (storedKeyLengthLength != SmallKeyLength.allBitsOn); + boolean isKeyLengthSmall = (storedKeyLength != SmallKeyLength.allBitsOn); - // LOG.debug("VectorMapJoinFastKeyStore equalKey keyLength " + keyLength + " isKeyLengthSmall " + isKeyLengthSmall + " storedKeyLengthLength " + storedKeyLengthLength + " keyRefWord " + Long.toHexString(keyRefWord)); - - if (isKeyLengthSmall && storedKeyLengthLength != keyLength) { + if (isKeyLengthSmall && storedKeyLength != keyLength) { return false; } long absoluteKeyOffset = @@ -139,16 +138,14 @@ public class VectorMapJoinFastKeyStore implements MemoryEstimate { writeBuffers.setReadPoint(absoluteKeyOffset, readPos); if (!isKeyLengthSmall) { // Read big value length we wrote with the value. - storedKeyLengthLength = writeBuffers.readVInt(readPos); - if (storedKeyLengthLength != keyLength) { - // LOG.debug("VectorMapJoinFastKeyStore equalKey no match big length"); + storedKeyLength = writeBuffers.readVInt(readPos); + if (storedKeyLength != keyLength) { return false; } } // Our reading is positioned to the key. if (!writeBuffers.isEqual(keyBytes, keyStart, readPos, keyLength)) { - // LOG.debug("VectorMapJoinFastKeyStore equalKey no match on bytes"); return false; } @@ -167,6 +164,25 @@ public class VectorMapJoinFastKeyStore implements MemoryEstimate { unsafeReadPos = new WriteBuffers.Position(); } + public void getKey(long keyRefWord, ByteSegmentRef keyByteSegmentRef, + WriteBuffers.Position readPos) { + + int storedKeyLength = + (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift); + boolean isKeyLengthSmall = (storedKeyLength != SmallKeyLength.allBitsOn); + + long absoluteKeyOffset = + (keyRefWord & AbsoluteKeyOffset.bitMask); + + writeBuffers.setReadPoint(absoluteKeyOffset, readPos); + if (!isKeyLengthSmall) { + + // Read big key length we wrote with the key. + storedKeyLength = writeBuffers.readVInt(readPos); + } + writeBuffers.getByteSegmentRefToCurrent(keyByteSegmentRef, storedKeyLength, readPos); + } + @Override public long getEstimatedMemorySize() { long size = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java index f42430d..a4cda92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java @@ -22,13 +22,17 @@ import java.io.IOException; import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +// import org.slf4j.Logger; +// import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; +import org.apache.hadoop.hive.serde2.WriteBuffers; +import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef; import org.apache.hadoop.io.BytesWritable; import org.apache.hive.common.util.HashCodeUtil; @@ -41,17 +45,115 @@ public class VectorMapJoinFastLongHashMap extends VectorMapJoinFastLongHashTable implements VectorMapJoinLongHashMap, MemoryEstimate { - public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class); + // public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class); protected VectorMapJoinFastValueStore valueStore; private BytesWritable testValueBytesWritable; + private long fullOuterNullKeyValueRef; + + private static class NonMatchedLongHashMapIterator extends VectorMapJoinFastNonMatchedIterator { + + private VectorMapJoinFastLongHashMap hashMap; + + private boolean noMore; + private boolean keyIsNull; + + private WriteBuffers.Position nonMatchedReadPos; + + private ByteSegmentRef nonMatchedKeyByteSegmentRef; + + private VectorMapJoinFastValueStore.HashMapResult nonMatchedHashMapResult; + + NonMatchedLongHashMapIterator(MatchTracker matchTracker, + VectorMapJoinFastLongHashMap hashMap) { + super(matchTracker); + this.hashMap = hashMap; + } + + @Override + public void init() { + super.init(); + noMore = false; + keyIsNull = false; + nonMatchedHashMapResult = new VectorMapJoinFastValueStore.HashMapResult(); + } + + @Override + public boolean findNextNonMatched() { + if (noMore) { + return false; + } + while (true) { + nonMatchedLogicalSlotNum++; + if (nonMatchedLogicalSlotNum >= hashMap.logicalHashBucketCount){ + + // Fall below and handle Small Table NULL key. + break; + } + final int nonMatchedDoubleIndex = nonMatchedLogicalSlotNum * 2; + if (hashMap.slotPairs[nonMatchedDoubleIndex] != 0) { + if (!matchTracker.wasMatched(nonMatchedLogicalSlotNum)) { + nonMatchedHashMapResult.set( + hashMap.valueStore, hashMap.slotPairs[nonMatchedDoubleIndex]); + keyIsNull = false; + return true; + } + } + } + + // Do we have a Small Table NULL Key? + if (hashMap.fullOuterNullKeyValueRef == 0) { + return false; + } + nonMatchedHashMapResult.set( + hashMap.valueStore, hashMap.fullOuterNullKeyValueRef); + noMore = true; + keyIsNull = true; + return true; + } + + @Override + public boolean readNonMatchedLongKey() { + return !keyIsNull; + } + + @Override + public long getNonMatchedLongKey() { + return hashMap.slotPairs[nonMatchedLogicalSlotNum * 2 + 1]; + } + + @Override + public VectorMapJoinHashMapResult getNonMatchedHashMapResult() { + return nonMatchedHashMapResult; + } + } + @Override public VectorMapJoinHashMapResult createHashMapResult() { return new VectorMapJoinFastValueStore.HashMapResult(); } + @Override + public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) { + return new NonMatchedLongHashMapIterator(matchTracker, this); + } + + @Override + public void putRow(BytesWritable currentKey, BytesWritable currentValue) + throws HiveException, IOException { + + if (!adaptPutRow(currentKey, currentValue)) { + + // Ignore NULL keys, except for FULL OUTER. + if (isFullOuter) { + addFullOuterNullKeyValue(currentValue); + } + + } + } + /* * A Unit Test convenience method for putting key and value into the hash table using the * actual types. @@ -91,13 +193,12 @@ public class VectorMapJoinFastLongHashMap optimizedHashMapResult.forget(); long hashCode = HashCodeUtil.calculateLongHashCode(key); - // LOG.debug("VectorMapJoinFastLongHashMap lookup " + key + " hashCode " + hashCode); - long valueRef = findReadSlot(key, hashCode); + int pairIndex = findReadSlot(key, hashCode); JoinUtil.JoinResult joinResult; - if (valueRef == -1) { + if (pairIndex == -1) { joinResult = JoinUtil.JoinResult.NOMATCH; } else { - optimizedHashMapResult.set(valueStore, valueRef); + optimizedHashMapResult.set(valueStore, slotPairs[pairIndex]); joinResult = JoinUtil.JoinResult.MATCH; } @@ -107,12 +208,59 @@ public class VectorMapJoinFastLongHashMap return joinResult; } + @Override + public JoinUtil.JoinResult lookup(long key, VectorMapJoinHashMapResult hashMapResult, + MatchTracker matchTracker) { + + VectorMapJoinFastValueStore.HashMapResult optimizedHashMapResult = + (VectorMapJoinFastValueStore.HashMapResult) hashMapResult; + + optimizedHashMapResult.forget(); + + long hashCode = HashCodeUtil.calculateLongHashCode(key); + int pairIndex = findReadSlot(key, hashCode); + JoinUtil.JoinResult joinResult; + if (pairIndex == -1) { + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + if (matchTracker != null) { + matchTracker.trackMatch(pairIndex / 2); + } + optimizedHashMapResult.set(valueStore, slotPairs[pairIndex]); + + joinResult = JoinUtil.JoinResult.MATCH; + } + + optimizedHashMapResult.setJoinResult(joinResult); + + return joinResult; + } + + public void addFullOuterNullKeyValue(BytesWritable currentValue) { + + byte[] valueBytes = currentValue.getBytes(); + int valueLength = currentValue.getLength(); + + if (fullOuterNullKeyValueRef == 0) { + fullOuterNullKeyValueRef = valueStore.addFirst(valueBytes, 0, valueLength); + } else { + + // Add another value. + fullOuterNullKeyValueRef = + valueStore.addMore(fullOuterNullKeyValueRef, valueBytes, 0, valueLength); + } + } + public VectorMapJoinFastLongHashMap( - boolean minMaxEnabled, boolean isOuterJoin, HashTableKeyType hashTableKeyType, + boolean isFullOuter, + boolean minMaxEnabled, + HashTableKeyType hashTableKeyType, int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(minMaxEnabled, isOuterJoin, hashTableKeyType, + super( + isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); valueStore = new VectorMapJoinFastValueStore(writeBuffersSize); + fullOuterNullKeyValueRef = 0; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/a37827ec/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java index 228fa72..43f093d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java @@ -42,11 +42,27 @@ public class VectorMapJoinFastLongHashMultiSet public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMultiSet.class); + private long fullOuterNullKeyValueCount; + @Override public VectorMapJoinHashMultiSetResult createHashMultiSetResult() { return new VectorMapJoinFastHashMultiSet.HashMultiSetResult(); } + @Override + public void putRow(BytesWritable currentKey, BytesWritable currentValue) + throws HiveException, IOException { + + if (!adaptPutRow(currentKey, currentValue)) { + + // Ignore NULL keys, except for FULL OUTER. + if (isFullOuter) { + fullOuterNullKeyValueCount++; + } + + } + } + /* * A Unit Test convenience method for putting the key into the hash table using the * actual type. @@ -80,12 +96,19 @@ public class VectorMapJoinFastLongHashMultiSet optimizedHashMultiSetResult.forget(); long hashCode = HashCodeUtil.calculateLongHashCode(key); - long count = findReadSlot(key, hashCode); + int pairIndex = findReadSlot(key, hashCode); JoinUtil.JoinResult joinResult; - if (count == -1) { + if (pairIndex == -1) { joinResult = JoinUtil.JoinResult.NOMATCH; } else { - optimizedHashMultiSetResult.set(count); + /* + * NOTE: Support for trackMatched not needed yet for Set. + + if (matchTracker != null) { + matchTracker.trackMatch(pairIndex / 2); + } + */ + optimizedHashMultiSetResult.set(slotPairs[pairIndex]); joinResult = JoinUtil.JoinResult.MATCH; } @@ -95,10 +118,15 @@ public class VectorMapJoinFastLongHashMultiSet } public VectorMapJoinFastLongHashMultiSet( - boolean minMaxEnabled, boolean isOuterJoin, HashTableKeyType hashTableKeyType, + boolean isFullOuter, + boolean minMaxEnabled, + HashTableKeyType hashTableKeyType, int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { - super(minMaxEnabled, isOuterJoin, hashTableKeyType, + super( + isFullOuter, + minMaxEnabled, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); + fullOuterNullKeyValueCount = 0; } @Override