This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 13e72c1ed846a4c2a5ee18129e773a3e2c215cdc Author: Ben-Zvi <bben-...@mapr.com> AuthorDate: Wed Feb 14 12:36:39 2018 -0800 DRILL-6027: Initial implementation of HashJoin spill, without memory limits checks yet --- .../test/java/org/apache/drill/test/DrillTest.java | 2 +- .../java/org/apache/drill/exec/ExecConstants.java | 14 + .../apache/drill/exec/cache/VectorSerializer.java | 3 +- .../drill/exec/physical/impl/BaseRootExec.java | 7 +- .../apache/drill/exec/physical/impl/ScanBatch.java | 4 + .../physical/impl/aggregate/HashAggTemplate.java | 19 +- .../impl/aggregate/SpilledRecordbatch.java | 11 +- .../physical/impl/common/ChainedHashTable.java | 14 +- .../exec/physical/impl/common/HashPartition.java | 500 +++++++++++ .../drill/exec/physical/impl/common/HashTable.java | 10 +- .../physical/impl/common/HashTableTemplate.java | 59 +- .../exec/physical/impl/join/HashJoinBatch.java | 945 +++++++++++++++------ .../exec/physical/impl/join/HashJoinHelper.java | 4 + .../exec/physical/impl/join/HashJoinProbe.java | 53 -- .../physical/impl/join/HashJoinProbeTemplate.java | 261 ------ .../drill/exec/physical/impl/spill/SpillSet.java | 5 + .../unorderedreceiver/UnorderedReceiverBatch.java | 5 + .../validate/IteratorValidatorBatchIterator.java | 5 + .../drill/exec/record/AbstractRecordBatch.java | 5 + .../org/apache/drill/exec/record/RecordBatch.java | 7 + .../apache/drill/exec/record/SchemalessBatch.java | 3 + .../drill/exec/record/SimpleRecordBatch.java | 5 + .../apache/drill/exec/record/VectorContainer.java | 120 ++- .../exec/server/options/SystemOptionManager.java | 5 + .../java-exec/src/main/resources/drill-module.conf | 18 + .../exec/physical/impl/join/TestHashJoinSpill.java | 123 +++ .../exec/physical/unit/MiniPlanUnitTestBase.java | 2 +- .../exec/physical/unit/PhysicalOpUnitTestBase.java | 9 +- .../org/apache/drill/test/DrillTestWrapper.java | 14 +- .../org/apache/drill/test/rowSet/RowSetBatch.java | 3 + exec/java-exec/src/test/resources/empty.json | 0 31 files changed, 1612 insertions(+), 623 deletions(-) diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java index 24ec381..28544ab 100644 --- a/common/src/test/java/org/apache/drill/test/DrillTest.java +++ b/common/src/test/java/org/apache/drill/test/DrillTest.java @@ -56,7 +56,7 @@ public class DrillTest { static MemWatcher memWatcher; static String className; - @Rule public final TestRule TIMEOUT = new DisableOnDebug(TestTools.getTimeoutRule(100_000)); + @Rule public final TestRule TIMEOUT = new DisableOnDebug(TestTools.getTimeoutRule(1_000_000)); @Rule public final TestLogReporter logOutcome = LOG_OUTCOME; @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 671ce4b..57ecdf2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -103,6 +103,20 @@ public final class ExecConstants { public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed"); + // Hash Join Options + public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = "exec.hashjoin.num_rows_in_batch"; + public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536); + public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = "exec.hashjoin.max_batches_in_memory"; + public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 1, 65536); + public static final String HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY = "exec.hashjoin.max_batches_per_partition"; + public static final LongValidator HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY, 1, 65536); + public static final String HASHJOIN_NUM_PARTITIONS_KEY = "exec.hashjoin.num_partitions"; + public static final LongValidator HASHJOIN_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHJOIN_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling + public static final String HASHJOIN_MAX_MEMORY_KEY = "exec.hashjoin.mem_limit"; + public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE); + public static final String HASHJOIN_SPILL_DIRS = "drill.exec.hashjoin.spill.directories"; + public static final String HASHJOIN_SPILL_FILESYSTEM = "drill.exec.hashjoin.spill.fs"; + // Hash Aggregate Options public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions"; public static final LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java index 03ea11e..b5beb62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java @@ -91,7 +91,7 @@ public class VectorSerializer { final DrillBuf[] incomingBuffers = batch.getBuffers(); final UserBitShared.RecordBatchDef batchDef = batch.getDef(); - bytesWritten = batchDef.getSerializedSize(); + int bytesWritten = batchDef.getSerializedSize(); /* Write the metadata to the file */ batchDef.writeDelimitedTo(output); @@ -115,6 +115,7 @@ public class VectorSerializer { } timeNs += timerContext.stop(); + this.bytesWritten += bytesWritten; return bytesWritten; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 76fd642..b18a78e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -139,12 +139,7 @@ public abstract class BaseRootExec implements RootExec { // close all operators. if (operators != null) { - final DeferredException df = new DeferredException(new Supplier<Exception>() { - @Override - public Exception get() { - return new RuntimeException("Error closing operators"); - } - }); + final DeferredException df = new DeferredException(); for (final CloseableRecordBatch crb : operators) { df.suppressingClose(crb); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index ea943b2..4a62752 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -487,6 +487,10 @@ public class ScanBatch implements CloseableRecordBatch { this.getClass().getCanonicalName())); } + @Override + public VectorContainer getContainer() { + return container; + } /** * Verify list of implicit column values is valid input: * - Either implicit column list is empty; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 4c54080..368fd2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -179,7 +179,7 @@ public abstract class HashAggTemplate implements HashAggregator { NUM_RESIZING, RESIZING_TIME_MS, NUM_PARTITIONS, - SPILLED_PARTITIONS, // number of partitions spilled to disk + SPILLED_PARTITIONS, // number of original partitions spilled to disk SPILL_MB, // Number of MB of data spilled to disk. This amount is first written, // then later re-read. So, disk I/O is twice this amount. // For first phase aggr -- this is an estimate of the amount of data @@ -187,8 +187,6 @@ public abstract class HashAggTemplate implements HashAggregator { SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY ; - // duplicate for hash ag - @Override public int metricId() { return ordinal(); @@ -201,8 +199,6 @@ public abstract class HashAggTemplate implements HashAggregator { private int maxOccupiedIdx = -1; private int batchOutputCount = 0; - private int capacity = Integer.MAX_VALUE; - @SuppressWarnings("resource") public BatchHolder() { @@ -233,8 +229,6 @@ public abstract class HashAggTemplate implements HashAggregator { vector.allocateNew(); } - capacity = Math.min(capacity, vector.getValueCapacity()); - aggrValuesContainer.add(vector); } success = true; @@ -464,7 +458,7 @@ public abstract class HashAggTemplate implements HashAggregator { // initialize every (per partition) entry in the arrays for (int i = 0; i < numPartitions; i++ ) { try { - this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); + this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds); this.htables[i].setMaxVarcharSize(maxColumnWidth); } catch (ClassTransformationException e) { throw UserException.unsupportedError(e) @@ -490,12 +484,13 @@ public abstract class HashAggTemplate implements HashAggregator { public RecordBatch getNewIncoming() { return newIncoming; } private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException { - baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming + baseHashTable.updateIncoming(newIncoming, null); // after a spill - a new incoming this.incoming = newIncoming; currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file nextPartitionToReturn = 0; for (int i = 0; i < numPartitions; i++ ) { - htables[i].reinit(newIncoming); + htables[i].updateIncoming(newIncoming.getContainer(), null); + htables[i].reset(); if ( batchHolders[i] != null) { for (BatchHolder bh : batchHolders[i]) { bh.clear(); @@ -547,7 +542,7 @@ public abstract class HashAggTemplate implements HashAggregator { } // multiply by the max number of rows in a batch to get the final estimated max size estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE; - // (When there are no aggr functions, use '1' as later code relies on this size being non-zero) + // (When there are no aggr functions, use '1' as later code relies on this siisDebze being non-zero) estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE; estOutgoingAllocSize = estValuesBatchSize; // initially assume same size @@ -1260,7 +1255,7 @@ public abstract class HashAggTemplate implements HashAggregator { int hashCode; try { // htables[0].updateBatches(); - hashCode = htables[0].getHashCode(incomingRowIdx); + hashCode = htables[0].getBuildHashCode(incomingRowIdx); } catch (SchemaChangeException e) { throw new UnsupportedOperationException("Unexpected schema change", e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java index c473b94..c78e2c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -48,6 +48,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch { private SpillSet spillSet; private String spillFile; VectorAccessibleSerializable vas; + private IterOutcome initialOutcome; public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { this.context = context; @@ -64,7 +65,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch { throw UserException.resourceError(e).build(HashAggBatch.logger); } - next(); // initialize the container + initialOutcome = next(); // initialize the container } @Override @@ -107,6 +108,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch { public VectorContainer getOutgoingContainer() { return container; } @Override + public VectorContainer getContainer() { return container; } + + @Override public int getRecordCount() { return container.getRecordCount(); } @Override @@ -156,6 +160,11 @@ public class SpilledRecordbatch implements CloseableRecordBatch { } /** + * Return the initial outcome (from the first next() call ) + */ + public IterOutcome getInitialOutcome() { return initialOutcome; } + + /** * Note: ignoring any IO errors (e.g. file not found) */ @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index e061bdd..89e32a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -116,7 +116,7 @@ public class ChainedHashTable { private final FragmentContext context; private final BufferAllocator allocator; private RecordBatch incomingBuild; - private final RecordBatch incomingProbe; + private RecordBatch incomingProbe; private final RecordBatch outgoing; public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, @@ -130,11 +130,12 @@ public class ChainedHashTable { this.outgoing = outgoing; } - public void updateIncoming(RecordBatch incomingBuild) { + public void updateIncoming(RecordBatch incomingBuild, RecordBatch incomingProbe) { this.incomingBuild = incomingBuild; + this.incomingProbe = incomingProbe; } - public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException, + public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getOptions()); top.plainJavaCapable(true); @@ -225,14 +226,13 @@ public class ChainedHashTable { setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true); HashTable ht = context.getImplementationClass(top); - ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig); + ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig); return ht; } private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, - LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds) - throws SchemaChangeException { + LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds) { cg.setMappingSet(incomingMapping); if (keyExprs == null || keyExprs.length == 0) { @@ -276,7 +276,7 @@ public class ChainedHashTable { } private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs, - TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { + TypedFieldId[] htKeyFieldIds) { cg.setMappingSet(SetValueMapping); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java new file mode 100644 index 0000000..5b4adf1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.common; + +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.RetryAfterSpillException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.cache.VectorSerializer; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.join.HashJoinBatch; +import org.apache.drill.exec.physical.impl.join.HashJoinHelper; +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.ObjectVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * The class HashPartition + * + * Created to represent an active partition for the Hash-Join operator + * (active means: currently receiving data, or its data is being probed; as opposed to fully + * spilled partitions). + * After all the build/iner data is read for this partition - if all its data is in memory, then + * a hash table and a helper are created, and later this data would be probed. + * If all this partition's build/inner data was spilled, then it begins to work as an outer + * partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch, + * currHVVector, writer, spillFile, partitionBatchesCount) for the outer. + */ +public class HashPartition { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class); + + private int partitionNum = -1; // the current number of this partition, as used by the operator + + private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; + private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars + + private final MajorType HVtype = MajorType.newBuilder() + .setMinorType(MinorType.INT /* dataType */ ) + .setMode(DataMode.REQUIRED /* mode */ ) + .build(); + + // The vector containers storing all the inner rows + // * Records are retrieved from these containers when there is a matching record + // * on the probe side + private ArrayList<VectorContainer> containers; + + // While build data is incoming - temporarily keep the list of in-memory + // incoming batches, per each partition (these may be spilled at some point) + private List<VectorContainer> tmpBatchesList; + // A batch and HV vector to hold incoming rows - per each partition + private VectorContainer currentBatch; // The current (newest) batch + private IntVector currHVVector; // The HV vectors for the currentBatches + + /* Helper class + * Maintains linked list of build side records with the same key + * Keeps information about which build records have a corresponding + * matching key in the probe side (for outer, right joins) + */ + private HashJoinHelper hjHelper; + + // Underlying hashtable used by the hash join + private HashTable hashTable; + + private VectorSerializer.Writer writer; // a vector writer for each spilled partition + private int partitionBatchesCount; // count number of batches spilled + private String spillFile; + + private BufferAllocator allocator; + private FragmentContext context; + private int RECORDS_PER_BATCH; + ChainedHashTable baseHashTable; + private SpillSet spillSet; + private boolean isSpilled; // is this partition spilled ? + private boolean processingOuter; // is (inner done spilling and) now the outer is processed? + private boolean outerBatchNotNeeded; // when the inner is whole in memory + private RecordBatch buildBatch; + private RecordBatch probeBatch; + private HashJoinBatch.inMemBatchCounter inMemBatches; // shared among all partitions + private int cycleNum; + + public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable, + RecordBatch buildBatch, RecordBatch probeBatch, + int recordsPerBatch, SpillSet spillSet, int partNum, + HashJoinBatch.inMemBatchCounter inMemBatches, int cycleNum) { + this.context = context; + this.allocator = allocator; + this.baseHashTable = baseHashTable; + this.buildBatch = buildBatch; + this.probeBatch = probeBatch; + this.RECORDS_PER_BATCH = recordsPerBatch; + this.spillSet = spillSet; + this.partitionNum = partNum; + this.inMemBatches = inMemBatches; + this.cycleNum = cycleNum; + + try { + this.hashTable = baseHashTable.createAndSetupHashTable(null); + this.hashTable.setMaxVarcharSize(maxColumnWidth); + } catch (ClassTransformationException e) { + throw UserException.unsupportedError(e) + .message("Code generation error - likely an error in the code.") + .build(logger); + } catch (IOException e) { + throw UserException.resourceError(e) + .message("IO Error while creating a hash table.") + .build(logger); + } catch (SchemaChangeException sce) { + throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce); + } + this.hjHelper = new HashJoinHelper(context, allocator); + tmpBatchesList = new ArrayList<>(); + allocateNewCurrentBatchAndHV(); + } + + /** + * Allocate a new vector container for either right or left record batch + * Add an additional special vector for the hash values + * Note: this call may OOM !! + * @param rb - either the right or the left record batch + * @return the new vector container + */ + private VectorContainer allocateNewVectorContainer(RecordBatch rb) { + VectorContainer newVC = new VectorContainer(); + VectorContainer fromVC = rb.getContainer(); + Iterator<VectorWrapper<?>> vci = fromVC.iterator(); + boolean success = false; + + try { + while (vci.hasNext()) { + VectorWrapper vw = vci.next(); + // If processing a spilled container, skip the last column (HV) + if ( cycleNum > 0 && ! vci.hasNext() ) { break; } + ValueVector vv = vw.getValueVector(); + ValueVector newVV = TypeHelper.getNewVector(vv.getField(), allocator); + newVC.add(newVV); // add first to allow dealloc in case of an OOM + + if (newVV instanceof FixedWidthVector) { + ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH); + } else if (newVV instanceof VariableWidthVector) { + // Need to check - (is this case ever used?) if a varchar falls under ObjectVector which is allocated on the heap ! + ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * RECORDS_PER_BATCH, RECORDS_PER_BATCH); + } else if (newVV instanceof ObjectVector) { + ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH); + } else { + newVV.allocateNew(); + } + } + + newVC.setRecordCount(0); + inMemBatches.inc(); ; // one more batch in memory + success = true; + } finally { + if ( !success ) { + newVC.clear(); // in case of an OOM + } + } + return newVC; + } + + /** + * Allocate a new current Vector Container and current HV vector + */ + public void allocateNewCurrentBatchAndHV() { + if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in memory + currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch); + currHVVector = new IntVector(MaterializedField.create("Hash_Values", HVtype), allocator); + currHVVector.allocateNew(RECORDS_PER_BATCH); + } + + /** + * Spills if needed + */ + public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, boolean needsSpill) { + + int pos = currentBatch.appendRow(buildContainer,ind); + currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column + if ( pos + 1 == RECORDS_PER_BATCH ) { + completeAnInnerBatch(true, needsSpill); + } + } + + /** + * Outer always spills when batch is full + * + */ + public void appendOuterRow(int hashCode, int recordsProcessed) { + int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed); + currHVVector.getMutator().set(pos, hashCode); // store the hash value in the new column + if ( pos + 1 == RECORDS_PER_BATCH ) { + completeAnOuterBatch(true); + } + } + + public void completeAnOuterBatch(boolean toInitialize) { + completeABatch(toInitialize, true); + } + public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) { + completeABatch(toInitialize, needsSpill); + } + /** + * A current batch is full (or no more rows incoming) - complete processing this batch + * I.e., add it to its partition's tmp list, if needed - spill that list, and if needed - + * (that is, more rows are coming) - initialize with a new current batch for that partition + * */ + private void completeABatch(boolean toInitialize, boolean needsSpill) { + if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) { + currentBatch.add(currHVVector); + currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE); + tmpBatchesList.add(currentBatch); + partitionBatchesCount++; + } else { + freeCurrentBatchAndHVVector(); + } + if ( needsSpill ) { // spill this batch/partition and free its memory + spillThisPartition(tmpBatchesList, processingOuter ? "outer" : "inner"); + } + if ( toInitialize ) { // allocate a new batch and HV vector + allocateNewCurrentBatchAndHV(); + } else { + currentBatch = null; + currHVVector = null; + } + } + + private void spillThisPartition(List<VectorContainer> vcList, String side) { + if ( vcList.size() == 0 ) { return; } // in case empty - nothing to spill + logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, vcList.size()); + + // If this is the first spill for this partition, create an output stream + if ( writer == null ) { + // A special case - when (outer is) empty + if ( vcList.get(0).getRecordCount() == 0 ) { + VectorContainer vc = vcList.remove(0); + inMemBatches.dec(); + vc.zeroVectors(); + return; + } + String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side; + spillFile = spillSet.getNextSpillFile(suffix); + + try { + writer = spillSet.writer(spillFile); + } catch (IOException ioe) { + throw UserException.resourceError(ioe) + .message("Hash Join failed to open spill file: " + spillFile) + .build(logger); + } + + isSpilled = true; + } + + while ( vcList.size() > 0 ) { + VectorContainer vc = vcList.remove(0); + inMemBatches.dec(); + + int numRecords = vc.getRecordCount(); + if (numRecords == 0) { // Spilling should to skip an empty batch + vc.zeroVectors(); + continue; + } + + // set the value count for outgoing batch value vectors + for (VectorWrapper<?> v : vc) { + v.getValueVector().getMutator().setValueCount(numRecords); + } + + WritableBatch batch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false); + try { + writer.write(batch, null); + } catch (IOException ioe) { + throw UserException.dataWriteError(ioe) + .message("Hash Join failed to write to output file: " + spillFile) + .build(logger); + } finally { + batch.clear(); + } + vc.zeroVectors(); + logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords); + + } + } + + // + // ===== Methods to probe the hash table and to get indices out of the helper ======= + // + + public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException { + return hashTable.probeForKey(recordsProcessed, hashCode); + } + public int getStartIndex(int probeIndex) { + /* The current probe record has a key that matches. Get the index + * of the first row in the build side that matches the current key + */ + int compositeIndex = hjHelper.getStartIndex(probeIndex); + /* Record in the build side at currentCompositeIdx has a matching record in the probe + * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT + * join we keep track of which records we need to project at the end + */ + hjHelper.setRecordMatched(compositeIndex); + return compositeIndex; + } + public int getNextIndex(int compositeIndex) { + // in case of iner rows with duplicate keys, get the next one + return hjHelper.getNextIndex(compositeIndex); + } + public void setRecordMatched(int compositeIndex) { + hjHelper.setRecordMatched(compositeIndex); + } + public List<Integer> getNextUnmatchedIndex() { + return hjHelper.getNextUnmatchedIndex(); + } + // + // ===================================================================================== + // + + public int getBuildHashCode(int ind) throws SchemaChangeException { + return hashTable.getBuildHashCode(ind); + } + public int getProbeHashCode(int ind) throws SchemaChangeException { + return hashTable.getProbeHashCode(ind); + } + public ArrayList<VectorContainer> getContainers() { + return containers; + } + + public void updateBatches() throws SchemaChangeException { + hashTable.updateBatches(); + } + public boolean isSpilled() { + return isSpilled; + } + public String getSpillFile() { + return spillFile; + } + + public int getPartitionBatchesCount() { + return partitionBatchesCount; + } + public int getPartitionNum() { + return partitionNum; + } + + private void freeCurrentBatchAndHVVector() { + if ( currentBatch != null ) { + inMemBatches.dec(); + currentBatch.clear(); + currentBatch = null; + } + if ( currHVVector != null ) { + currHVVector.clear(); + currHVVector = null; + } + } + + public void closeWriterAndDeleteFile() { + closeWriterInternal(true); + } + public void closeWriter() { // no deletion !! + closeWriterInternal(false); + processingOuter = true; // After the spill file was closed + } + /** + * If exists - close the writer for this partition + * + * @param doDeleteFile Also delete the associated file + */ + private void closeWriterInternal(boolean doDeleteFile) { + try { + if ( writer != null ) { + spillSet.close(writer); + } + if ( doDeleteFile && spillFile != null ) { + spillSet.delete(spillFile); + } + } catch (IOException ioe) { + throw UserException.resourceError(ioe) + .message("IO Error while closing %s spill file %s", + doDeleteFile ? "and deleting" : "", + spillFile) + .build(logger); + } + spillFile = null; + writer = null; + partitionBatchesCount = 0; + } + + /** + * + */ + public void buildContainersHashTableAndHelper() throws SchemaChangeException { + if ( isSpilled ) { return; } // no building for spilled partitions + containers = new ArrayList<>(); + for (int curr = 0; curr < partitionBatchesCount; curr++) { + VectorContainer nextBatch = tmpBatchesList.get(curr); + final int currentRecordCount = nextBatch.getRecordCount(); + + // For every incoming build batch, we create a matching helper batch + hjHelper.addNewBatch(currentRecordCount); + + // Holder contains the global index where the key is hashed into using the hash table + final IndexPointer htIndex = new IndexPointer(); + + assert nextBatch != null; + assert probeBatch != null; + + hashTable.updateIncoming(nextBatch, probeBatch ); + + // IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector(); + IntVector HV_vector = (IntVector) nextBatch.getLast(); + + for (int recInd = 0; recInd < currentRecordCount; recInd++) { + int hashCode = HV_vector.getAccessor().get(recInd); + try { + hashTable.put(recInd, htIndex, hashCode); + } catch (RetryAfterSpillException RE) { + throw new OutOfMemoryException("HT put"); + } // Hash Join can not retry yet + /* Use the global index returned by the hash table, to store + * the current record index and batch index. This will be used + * later when we probe and find a match. + */ + hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); + } + + containers.add(nextBatch); + } + outerBatchNotNeeded = true; // the inner is whole in memory, no need for an outer batch + } + + public void getStats(HashTableStats newStats) { + hashTable.getStats(newStats); + } + + public void clearHashTableAndHelper() { + if (hashTable != null) { + hashTable.clear(); + hashTable = null; + } + if (hjHelper != null) { + hjHelper.clear(); + hjHelper = null; + } + } + + public void close() { + freeCurrentBatchAndHVVector(); + if (containers != null && !containers.isEmpty()) { + for (VectorContainer vc : containers) { + vc.clear(); + } + } + while ( tmpBatchesList.size() > 0 ) { + VectorContainer vc = tmpBatchesList.remove(0); + inMemBatches.dec(); + vc.clear(); + } + closeWriter(); + partitionBatchesCount = 0; + spillFile = null; + clearHashTableAndHelper(); + if ( containers != null ) { containers.clear(); } + } + +} // class HashPartition diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index a5eb1f2..ed8f388 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -46,16 +46,18 @@ public interface HashTable { int BATCH_SIZE = Character.MAX_VALUE + 1; int BATCH_MASK = 0x0000FFFF; - void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, + void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); void updateBatches() throws SchemaChangeException; - int getHashCode(int incomingRowIdx) throws SchemaChangeException; + int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException; + + int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException; PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException; - int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; + int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException; void getStats(HashTableStats stats); @@ -65,7 +67,7 @@ public interface HashTable { void clear(); - void reinit(RecordBatch newIncoming); + public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe); void reset(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 64f8144..b7a7f7b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -79,7 +79,7 @@ public abstract class HashTableTemplate implements HashTable { private BufferAllocator allocator; // The incoming build side record batch - private RecordBatch incomingBuild; + private VectorContainer incomingBuild; // The incoming probe side record batch (may be null) private RecordBatch incomingProbe; @@ -417,7 +417,7 @@ public abstract class HashTableTemplate implements HashTable { @RuntimeOverridden protected void setupInterior( - @Named("incomingBuild") RecordBatch incomingBuild, + @Named("incomingBuild") VectorContainer incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe, @Named("outgoing") RecordBatch outgoing, @Named("htContainer") VectorContainer htContainer) throws SchemaChangeException { @@ -447,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable { @Override - public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { + public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); @@ -572,10 +572,31 @@ public abstract class HashTableTemplate implements HashTable { throw new RetryAfterSpillException(); } - public int getHashCode(int incomingRowIdx) throws SchemaChangeException { + /** + * Return the Hash Value for the row in the Build incoming batch at index: + * (For Hash Aggregate there's no "Build" side -- only one batch - this one) + * + * @param incomingRowIdx + * @return + * @throws SchemaChangeException + */ + @Override + public int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException { return getHashBuild(incomingRowIdx, 0); } + /** + * Return the Hash Value for the row in the Probe incoming batch at index: + * + * @param incomingRowIdx + * @return + * @throws SchemaChangeException + */ + @Override + public int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException { + return getHashProbe(incomingRowIdx, 0); + } + /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming * row into the hash table. The code selects the bucket in the startIndices, then the keys are * placed into the chained list - by storing the key values into a batch, and updating its @@ -585,7 +606,7 @@ public abstract class HashTableTemplate implements HashTable { * * @param incomingRowIdx - position of the incoming row * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch) - * @param hashCode - computed over the key(s) by calling getHashCode() + * @param hashCode - computed over the key(s) by calling getBuildHashCode() * @return Status - the key(s) was ADDED or was already PRESENT */ @Override @@ -659,17 +680,24 @@ public abstract class HashTableTemplate implements HashTable { PutStatus.KEY_ADDED; // otherwise } - // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key - @Override - public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException { - int seedValue = 0; - int hash = isProbe ? getHashProbe(incomingRowIdx, seedValue) : getHashBuild(incomingRowIdx, seedValue); - int bucketIndex = getBucketIndex(hash, numBuckets()); + /** + * Return -1 if Probe-side key is not found in the (build-side) hash table. + * Otherwise, return the global index of the key + * + * + * @param incomingRowIdx + * @param hashCode - The hash code for the Probe-side key + * @return -1 if key is not found, else return the global index of the key + * @throws SchemaChangeException + */ + @Override + public int probeForKey(int incomingRowIdx, int hashCode) throws SchemaChangeException { + int bucketIndex = getBucketIndex(hashCode, numBuckets()); for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex); currentIdxHolder.value != EMPTY_SLOT; ) { BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK); - if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) { + if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, true /* isProbe */)) { return currentIdxHolder.value; } } @@ -776,9 +804,10 @@ public abstract class HashTableTemplate implements HashTable { batchHolders = new ArrayList<BatchHolder>(); startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT); } - public void reinit(RecordBatch newIncoming) { + public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) { incomingBuild = newIncoming; - reset(); + incomingProbe = newIncomingProbe; + // reset(); try { updateBatches(); // Needed to update the value vectors in the generated code with the new incoming } catch (SchemaChangeException e) { @@ -806,7 +835,7 @@ public abstract class HashTableTemplate implements HashTable { public void setMaxVarcharSize(int size) { maxVarcharSize = size; } // These methods will be code-generated in the context of the outer class - protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException; + protected abstract void doSetup(@Named("incomingBuild") VectorContainer incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException; protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx, @Named("seedValue") int seedValue) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index b126255..998e0b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -20,9 +20,12 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.RetryAfterSpillException; + +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.common.logical.data.NamedExpression; @@ -31,43 +34,37 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.compile.sig.GeneratorMapping; -import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.memory.BaseAllocator; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; -import org.apache.drill.exec.physical.impl.common.IndexPointer; import org.apache.drill.exec.physical.impl.common.Comparator; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.common.HashPartition; +import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; -import com.sun.codemodel.JExpr; -import com.sun.codemodel.JExpression; -import com.sun.codemodel.JVar; - public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { - public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024; - public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000; + protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class); + + private int RECORDS_PER_BATCH = 128; // 1024; // internal batches + private static final int TARGET_RECORDS_PER_BATCH = 4000; // Join type, INNER, LEFT, RIGHT or OUTER private final JoinRelType joinType; @@ -77,84 +74,84 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private final List<Comparator> comparators; - // Runtime generated class implementing HashJoinProbe interface - private HashJoinProbe hashJoinProbe = null; + // Fields used for partitioning + private int numPartitions = 1; // must be 2 to the power of bitsInMask (set in setup()) + private int partitionMask = 0; // numPartitions - 1 + private int bitsInMask = 0; // number of bits in the MASK + private ChainedHashTable baseHashTable; + private boolean buildSideIsEmpty = true; + private boolean canSpill = true; + private boolean wasKilled; // a kill was received, may need to clean spilled partns - /* Helper class - * Maintains linked list of build side records with the same key - * Keeps information about which build records have a corresponding - * matching key in the probe side (for outer, right joins) - */ - private HashJoinHelper hjHelper = null; - - // Underlying hashtable used by the hash join - private HashTable hashTable = null; - - /* Hyper container to store all build side record batches. - * Records are retrieved from this container when there is a matching record - * on the probe side - */ - private ExpandableHyperContainer hyperContainer; + HashPartition partitions[]; // Number of records in the output container private int outputRecords; - // Current batch index on the build side - private int buildBatchIndex = 0; - // Schema of the build side private BatchSchema rightSchema = null; + private final HashTableStats htStats = new HashTableStats(); + + private final MajorType HVtype = MajorType.newBuilder() + .setMinorType(org.apache.drill.common.types.TypeProtos.MinorType.INT /* dataType */ ) + .setMode(DataMode.REQUIRED /* mode */ ) + .build(); + + private int rightHVColPosition; + private BufferAllocator allocator; + // Local fields for left/right incoming - may be replaced when reading from spilled + private RecordBatch buildBatch; + private RecordBatch probeBatch; + + // For handling spilling + private SpillSet spillSet; + HashJoinPOP popConfig; + + private int cycleNum = 0; // primary, secondary, tertiary, etc. + private int originalPartition = -1; // the partition a secondary reads from + IntVector read_HV_vector; // HV vector that was read from the spilled batch + private int MAX_BATCHES_IN_MEMORY; + private int MAX_BATCHES_PER_PARTITION; + + public class inMemBatchCounter { + private int inMemBatches; + public void inc() { inMemBatches++; } + public void dec() { inMemBatches--; } + public int value() { return inMemBatches; } + } + public inMemBatchCounter inMemBatches = new inMemBatchCounter(); - // Generator mapping for the build side - // Generator mapping for the build side : scalar - private static final GeneratorMapping PROJECT_BUILD = - GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method */, null /* reset */, - null /* cleanup */); - // Generator mapping for the build side : constant - private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/* setup method */, - "doSetup" /* eval method */, - null /* reset */, null /* cleanup */); - - // Generator mapping for the probe side : scalar - private static final GeneratorMapping PROJECT_PROBE = - GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" /* eval method */, null /* reset */, - null /* cleanup */); - // Generator mapping for the probe side : constant - private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */, - "doSetup" /* eval method */, - null /* reset */, null /* cleanup */); - - - // Mapping set for the build side - private final MappingSet projectBuildMapping = - new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */, "buildBatch" /* read container */, - "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, PROJECT_BUILD); - - // Mapping set for the probe side - private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */, - "probeBatch" /* read container */, - "outgoing" /* write container */, - PROJECT_PROBE_CONSTANT, PROJECT_PROBE); - - // indicates if we have previously returned an output batch - boolean firstOutputBatch = true; + private static class HJSpilledPartition { + public int innerSpilledBatches; + public String innerSpillFile; + public int outerSpilledBatches; + public String outerSpillFile; + int cycleNum; + int origPartn; + int prevOrigPartn; } - private final HashTableStats htStats = new HashTableStats(); + private ArrayList<HJSpilledPartition> spilledPartitionsList; + private HJSpilledPartition spilledInners[]; // for the outer to find the partition + private int operatorId; // for the spill file name public enum Metric implements MetricDef { NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, - RESIZING_TIME_MS; + RESIZING_TIME_MS, + NUM_PARTITIONS, + SPILLED_PARTITIONS, // number of original partitions spilled to disk + SPILL_MB, // Number of MB of data spilled to disk. This amount is first written, + // then later re-read. So, disk I/O is twice this amount. + SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY + ; // duplicate for hash ag @Override - public int metricId() { - return ordinal(); - } + public int metricId() { return ordinal(); } } @Override @@ -169,31 +166,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } // Initialize the hash join helper context - hjHelper = new HashJoinHelper(context, oContext.getAllocator()); - try { - rightSchema = right.getSchema(); - final VectorContainer vectors = new VectorContainer(oContext); - for (final VectorWrapper<?> w : right) { - vectors.addOrGet(w.getField()); - } - vectors.buildSchema(SelectionVectorMode.NONE); - vectors.setRecordCount(0); - hyperContainer = new ExpandableHyperContainer(vectors); - hjHelper.addNewBatch(0); - buildBatchIndex++; - if (isFurtherProcessingRequired(rightUpstream) && this.right.getRecordCount() > 0) { - setupHashTable(); - } - hashJoinProbe = setupHashJoinProbe(); - // Build the container schema and set the counts - for (final VectorWrapper<?> w : container) { - w.getValueVector().allocateNew(); - } - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - container.setRecordCount(outputRecords); - } catch (IOException | ClassTransformationException e) { - throw new SchemaChangeException(e); + if (rightUpstream != IterOutcome.NONE) { + setupHashTable(); } + setupOutputContainerSchema(); + // Build the container schema and set the counts + for (final VectorWrapper<?> w : container) { + w.getValueVector().allocateNew(); + } + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container.setRecordCount(outputRecords); } @Override @@ -205,19 +187,21 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { if (state == BatchState.FIRST) { // Build the hash table, using the build side record batches. executeBuildPhase(); - hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, left.getRecordCount(), this, hashTable, - hjHelper, joinType, leftUpstream); // Update the hash table related stats for the operator - updateStats(this.hashTable); + updateStats(); + // + setupProbe(); } // Store the number of records projected - if ((hashTable != null && !hashTable.isEmpty()) || joinType != JoinRelType.INNER) { + + if ( ! buildSideIsEmpty || // If there are build-side rows + joinType != JoinRelType.INNER) { // or if this is a left/full outer join // Allocate the memory for the vectors in the output container allocateVectors(); - outputRecords = hashJoinProbe.probeAndProject(); + outputRecords = probeAndProject(); /* We are here because of one the following * 1. Completed processing of all the records and we are done @@ -235,34 +219,94 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return IterOutcome.OK; } + + // Free all partitions' in-memory data structures + // (In case need to start processing spilled partitions) + for ( HashPartition partn : partitions ) { + partn.close(); + } + + // + // (recursively) Handle the spilled partitions, if any + // + if ( !buildSideIsEmpty && !wasKilled && !spilledPartitionsList.isEmpty()) { + // Get the next (previously) spilled partition to handle as incoming + HJSpilledPartition currSp = spilledPartitionsList.remove(0); + + // Create a BUILD-side "incoming" out of the inner spill file of that partition + buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, rightSchema, oContext, spillSet); + // The above ctor call also got the first batch; need to update the outcome + rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome(); + + if ( currSp.outerSpilledBatches > 0 ) { + // Create a PROBE-side "incoming" out of the outer spill file of that partition + probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet); + // The above ctor call also got the first batch; need to update the outcome + leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome(); + } else { + probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming() + leftUpstream = IterOutcome.NONE; + changeToFinalProbeState(); + } + + // update the cycle num if needed + // The current cycle num should always be one larger than in the spilled partition + if (cycleNum == currSp.cycleNum) { + cycleNum = 1 + currSp.cycleNum; + stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats + // report first spill or memory stressful situations + if (cycleNum == 1) { logger.info("Started reading spilled records "); } + if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); } + if (cycleNum == 3) { logger.warn("TERTIARY SPILLING "); } + if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); } + if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); } + if ( cycleNum * bitsInMask > 20 ) { + spilledPartitionsList.add(currSp); // so cleanup() would delete the curr spill files + this.cleanup(); + throw UserException + .unsupportedError() + .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)") + .build(logger); + } + } + logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size()); + + state = BatchState.FIRST; // build again, initialize probe, etc + + return innerNext(); // start processing the next spilled partition "recursively" + } + } else { // Our build side is empty, we won't have any matches, clear the probe side if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (final VectorWrapper<?> wrapper : left) { + for (final VectorWrapper<?> wrapper : probeBatch) { wrapper.getValueVector().clear(); } - left.kill(true); - leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); + probeBatch.kill(true); + leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (final VectorWrapper<?> wrapper : left) { + for (final VectorWrapper<?> wrapper : probeBatch) { wrapper.getValueVector().clear(); } - leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); + leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); } } } // No more output records, clean up and return state = BatchState.DONE; + + this.cleanup(); + return IterOutcome.NONE; - } catch (ClassTransformationException | SchemaChangeException | IOException e) { + } catch (SchemaChangeException e) { context.getExecutorState().fail(e); killIncoming(false); return IterOutcome.STOP; } } - public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException { + private void setupHashTable() throws SchemaChangeException { // Setup the hash table configuration object int conditionsSize = conditions.size(); final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize); @@ -278,43 +322,103 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) { leftExpr = null; } else { - if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) { - final String errorMsg = new StringBuilder() - .append("Hash join does not support probe batch with selection vectors. ") - .append("Probe batch has selection mode = ") - .append(left.getSchema().getSelectionVectorMode()) - .toString(); + if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) { + final String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append + (probeBatch.getSchema().getSelectionVectorMode()).toString(); throw new SchemaChangeException(errorMsg); } } + final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators); - final HashTableConfig htConfig = - new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), - HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators); // Create the chained hash table - final ChainedHashTable ht = - new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null); - hashTable = ht.createAndSetupHashTable(null, 1); + baseHashTable = + new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null); } - public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { - //Setup the underlying hash table + /** + * Call only after num partitions is known + */ + private void delayedSetup() { + // + // Find out the estimated max batch size, etc + // and compute the max numPartitions possible + // + // numPartitions = 8; // just for initial work; change later + // partitionMask = 7; + // bitsInMask = 3; + + // SET FROM CONFIGURATION OPTIONS : + // ================================ + + // Set the number of partitions from the configuration (raise to a power of two, if needed) + numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR); + if ( numPartitions == 1 ) { // + canSpill = false; + logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1"); + } + numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 + // Based on the number of partitions: Set the mask and bit count + partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F + bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + + RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR); + + MAX_BATCHES_IN_MEMORY = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR); + MAX_BATCHES_PER_PARTITION = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR); + + // ================================= + + // Create the FIFO list of spilled partitions (pairs - inner/outer) + spilledPartitionsList = new ArrayList<>(); + + // Create array for the partitions + partitions = new HashPartition[numPartitions]; + + buildSideIsEmpty = false; + } + + /** + * Initialize fields (that may be reused when reading spilled partitions) + */ + private void initializeBuild() { + assert inMemBatches.value() == 0; // check that no in-memory batches left + baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files + // Recreate the partitions every time build is initialized + for (int part = 0; part < numPartitions; part++ ) { + partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, + RECORDS_PER_BATCH, spillSet, part, inMemBatches, cycleNum); + } + + spilledInners = new HJSpilledPartition[numPartitions]; + + } + /** + * Execute the BUILD phase; first read incoming and split rows into partitions; + * may decide to spill some of the partitions + * + * @throws SchemaChangeException + */ + public void executeBuildPhase() throws SchemaChangeException { + final HashJoinMemoryCalculator.BuildSidePartitioning buildCalc = new HashJoinMemoryCalculatorImpl().next(); + boolean hasProbeData = leftUpstream != IterOutcome.NONE; + + if ( rightUpstream == IterOutcome.NONE ) { return; } // empty right // skip first batch if count is zero, as it may be an empty schema batch - if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) { - for (final VectorWrapper<?> w : right) { + if (false && buildBatch.getRecordCount() == 0) { + for (final VectorWrapper<?> w : buildBatch) { w.clear(); } - rightUpstream = next(right); - if (isFurtherProcessingRequired(rightUpstream) && - right.getRecordCount() > 0 && hashTable == null) { - setupHashTable(); - } + rightUpstream = next(buildBatch); } - boolean moreData = true; + //Setup the underlying hash table + if ( cycleNum == 0 ) { delayedSetup(); } // first time only + + initializeBuild(); + boolean moreData = true; while (moreData) { switch (rightUpstream) { case OUT_OF_MEMORY: @@ -325,90 +429,123 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { continue; case OK_NEW_SCHEMA: - if (rightSchema == null) { - rightSchema = right.getSchema(); - - if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) { - final String errorMsg = new StringBuilder() - .append("Hash join does not support build batch with selection vectors. ") - .append("Build batch has selection mode = ") - .append(left.getSchema().getSelectionVectorMode()) - .toString(); - - throw new SchemaChangeException(errorMsg); - } - setupHashTable(); - } else { - if (!rightSchema.equals(right.getSchema())) { - throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, right.getSchema()); - } - hashTable.updateBatches(); + if (!rightSchema.equals(buildBatch.getSchema())) { + throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, buildBatch.getSchema()); } + for (HashPartition partn : partitions) { partn.updateBatches(); } // Fall through case OK: - final int currentRecordCount = right.getRecordCount(); + final int currentRecordCount = buildBatch.getRecordCount(); + + if ( cycleNum > 0 ) { + read_HV_vector = (IntVector) buildBatch.getContainer().getLast(); + } + // For every record in the build batch, hash the key columns and keep the result + for (int ind = 0; ind < currentRecordCount; ind++) { + int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind) + : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column + int currPart = hashCode & partitionMask ; + hashCode >>>= bitsInMask; +/* + int pos = currentBatches[currPart].appendRow(buildBatch.getContainer(),ind); + currHVVectors[currPart].getMutator().set(pos, hashCode); // store the hash value in the new column + if ( pos + 1 == RECORDS_PER_BATCH ) { + // The current decision on when-to-spill is crude + completeAnInnerBatch(currPart,true, + isSpilled(currPart) || // once spilled - then spill every new full batch + canSpill && + ( inMemBatches > MAX_BATCHES_IN_MEMORY || + tmpBatchesList[currPart].size() > MAX_BATCHES_PER_PARTITION )); + } +*/ + + // Append the new inner row to the appropriate partition; spill (that partition) if needed + partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, + // The current decision on when-to-spill is crude ... + partitions[currPart].isSpilled() || // once spilled - then spill every new full batch + canSpill && + ( inMemBatches.value() > MAX_BATCHES_IN_MEMORY || + partitions[currPart].getPartitionBatchesCount() > MAX_BATCHES_PER_PARTITION ) ); // may spill if needed + } + + if ( read_HV_vector != null ) { + read_HV_vector.clear(); + read_HV_vector = null; + } + break; + } + // Get the next incoming record batch + rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch); + } + + // Move the remaining current batches into their temp lists, or spill + // them if the partition is spilled. Add the spilled partitions into + // the spilled partitions list + for (HashPartition partn : partitions) { + partn.completeAnInnerBatch(false, partn.isSpilled() ); + if ( partn.isSpilled() ) { + HJSpilledPartition sp = new HJSpilledPartition(); + sp.innerSpillFile = partn.getSpillFile(); + sp.innerSpilledBatches = partn.getPartitionBatchesCount(); + sp.cycleNum = cycleNum; // remember the current cycle + sp.origPartn = partn.getPartitionNum(); // for debugging / filename + sp.prevOrigPartn = originalPartition; // for debugging / filename + spilledPartitionsList.add(sp); + + spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later + partn.closeWriter(); + } + } + + // + // Traverse all the in-memory partitions' incoming batches, and build their hash tables + // +/* + for (int currPart = 0; currPart < numPartitions; currPart++) { + + // each partition is a regular array of batches + ArrayList<VectorContainer> thisPart = new ArrayList<>(); - /* For every new build batch, we store some state in the helper context - * Add new state to the helper context - */ - hjHelper.addNewBatch(currentRecordCount); + for (int curr = 0; curr < partitionBatchesCount[currPart]; curr++) { + VectorContainer nextBatch = tmpBatchesList[currPart].get(curr); + final int currentRecordCount = nextBatch.getRecordCount(); + + // For every incoming build batch, we create a matching helper batch + hjHelpers[currPart].addNewBatch(currentRecordCount); // Holder contains the global index where the key is hashed into using the hash table final IndexPointer htIndex = new IndexPointer(); - // For every record in the build batch , hash the key columns - for (int i = 0; i < currentRecordCount; i++) { - int hashCode = hashTable.getHashCode(i); - try { - hashTable.put(i, htIndex, hashCode); - } catch (RetryAfterSpillException RE) { throw new OutOfMemoryException("HT put");} // Hash Join can not retry yet - /* Use the global index returned by the hash table, to store - * the current record index and batch index. This will be used - * later when we probe and find a match. - */ - hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i); - } + hashTables[currPart].updateIncoming(nextBatch, probeBatch ); - /* Completed hashing all records in this batch. Transfer the batch - * to the hyper vector container. Will be used when we want to retrieve - * records that have matching keys on the probe side. - */ - final RecordBatchData nextBatch = new RecordBatchData(right, oContext.getAllocator()); - boolean success = false; - try { - if (hyperContainer == null) { - hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer()); - } else { - hyperContainer.addBatch(nextBatch.getContainer()); - } + IntVector HV_vector = (IntVector) nextBatch.getValueVector(rightHVColPosition).getValueVector(); - // completed processing a batch, increment batch index - buildBatchIndex++; - success = true; - } finally { - if (!success) { - nextBatch.clear(); - } + for (int recInd = 0; recInd < currentRecordCount; recInd++) { + int hashCode = HV_vector.getAccessor().get(recInd); + try { + hashTables[currPart].put(recInd, htIndex, hashCode); + } catch (RetryAfterSpillException RE) { + throw new OutOfMemoryException("HT put"); + } // Hash Join can not retry yet + // Use the global index returned by the hash table, to store + //the current record index and batch index. This will be used + // later when we probe and find a match. + // + hjHelpers[currPart].setCurrentIndex(htIndex.value, curr , recInd); } - break; + + thisPart.add(nextBatch); } - // Get the next record batch - rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); - } - } - public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { - final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions()); - cg.plainJavaCapable(true); - final ClassGenerator<HashJoinProbe> g = cg.getRoot(); + partitionContainers.add(thisPart); +*/ + for (HashPartition partn : partitions) { + partn.buildContainersHashTableAndHelper(); + } - // Generate the code to project build side records - g.setMappingSet(projectBuildMapping); + } - int fieldId = 0; - final JExpression buildIndex = JExpr.direct("buildIndex"); - final JExpression outIndex = JExpr.direct("outIndex"); - g.rotateBlock(); + private void setupOutputContainerSchema() { if (rightSchema != null) { for (final MaterializedField field : rightSchema) { @@ -427,27 +564,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { final MaterializedField projected = field.withType(outputType); // Add the vector to our output container container.addOrGet(projected); - - final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId)); - final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId)); - g.getEvalBlock().add(outVV.invoke("copyFromSafe") - .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) - .arg(outIndex) - .arg(inVV.component(buildIndex.shrz(JExpr.lit(16))))); - g.rotateBlock(); - fieldId++; } } - // Generate the code to project probe side records - g.setMappingSet(projectProbeMapping); - - int outputFieldId = fieldId; - fieldId = 0; - final JExpression probeIndex = JExpr.direct("probeIndex"); - if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) { - for (final VectorWrapper<?> vv : left) { + for (final VectorWrapper<?> vv : probeBatch) { final MajorType inputType = vv.getField().getType(); final MajorType outputType; @@ -465,25 +586,22 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { vv.getValueVector().makeTransferPair(v); v.clear(); } - - final JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId)); - final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId)); - - g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV)); - g.rotateBlock(); - fieldId++; - outputFieldId++; } } - final HashJoinProbe hj = context.getImplementationClass(cg); - return hj; } private void allocateVectors() { for (final VectorWrapper<?> v : container) { v.getValueVector().allocateNew(); } + container.setRecordCount(0); // reset container's counter back to zero records + } + + // (After the inner side was read whole) - Has that inner partition spilled + public boolean isSpilledInner(int part) { + if ( spilledInners == null ) { return false; } // empty inner + return spilledInners[part] != null; } public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, @@ -491,56 +609,379 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { RecordBatch right /*Build side record batch*/ ) throws OutOfMemoryException { super(popConfig, context, true, left, right); + this.buildBatch = right; + this.probeBatch = left; joinType = popConfig.getJoinType(); conditions = popConfig.getConditions(); + this.popConfig = popConfig; comparators = Lists.newArrayListWithExpectedSize(conditions.size()); + // When DRILL supports Java 8, use the following instead of the for() loop + // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); for (int i=0; i<conditions.size(); i++) { JoinCondition cond = conditions.get(i); comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)); } + this.allocator = oContext.getAllocator(); + + final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR); + + if (memLimit != 0) { + allocator.setLimit(memLimit); + } + + RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR); + maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR); + + spillSet = new SpillSet(context, popConfig); + + // Create empty partitions (in the ctor - covers the case where right side is empty) + partitions = new HashPartition[0]; } - private void updateStats(HashTable htable) { - if (htable == null) { - return; + public void cleanup() { + if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean + if ( spillSet.getWriteBytes() > 0 ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled + (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); + } + // clean (and deallocate) each partition + for (HashPartition partn : partitions) { + partn.clearHashTableAndHelper(); + partn.closeWriterAndDeleteFile(); + } + + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + HJSpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.innerSpillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.innerSpillFile); + } + try { // outer file is added later; may be null if cleaning prematurely + if ( sp.outerSpillFile != null ) { spillSet.delete(sp.outerSpillFile); } + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.outerSpillFile); + } + } + // Delete the currently handled (if any) spilled files + spillSet.close(); // delete the spill directory(ies) + } + + private void updateStats() { + if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty + if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files + long numSpilled = 0; + HashTableStats newStats = new HashTableStats(); + // sum the stats from all the partitions + for ( HashPartition partn : partitions ) { + if ( partn.isSpilled() ) { numSpilled++; } + partn.getStats(newStats); + htStats.addStats(newStats); } - htable.getStats(htStats); - stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); - stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); - stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); - stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime); + + this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); + this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); + this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); + this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime); + this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions); + this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no spill + this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled); } @Override public void killIncoming(boolean sendUpstream) { - left.kill(sendUpstream); - right.kill(sendUpstream); + wasKilled = true; + probeBatch.kill(sendUpstream); + buildBatch.kill(sendUpstream); } @Override public void close() { - if (hjHelper != null) { - hjHelper.clear(); + for ( HashPartition partn : partitions ) { + partn.close(); } + cleanup(); + super.close(); + } + + // ============================================================== + // + // Methods used for the probe + // + // ============================================================ + private BatchSchema probeSchema; + + enum ProbeState { + PROBE_PROJECT, PROJECT_RIGHT, DONE + } + + private int currRightPartition = 0; // for returning RIGHT/FULL + + // Number of records to process on the probe side + private int recordsToProcess = 0; + + // Number of records processed on the probe side + private int recordsProcessed = 0; + + // Indicate if we should drain the next record from the probe side + private boolean getNextRecord = true; + + // Contains both batch idx and record idx of the matching record in the build side + private int currentCompositeIdx = -1; + + // Current state the hash join algorithm is in + private ProbeState probeState = ProbeState.PROBE_PROJECT; - // If we didn't receive any data, hyperContainer may be null, check before clearing - if (hyperContainer != null) { - hyperContainer.clear(); + // For outer or right joins, this is a list of unmatched records that needs to be projected + private List<Integer> unmatchedBuildIndexes = null; + + // While probing duplicates, retain current build-side partition in case need to continue + // probing later on the same chain of duplicates + private HashPartition currPartition; + + /** + * Various initialization needed to perform the probe + * Must be called AFTER the build completes + */ + private void setupProbe() { + currRightPartition = 0; // In case it's a Right/Full outer join + recordsProcessed = 0; + recordsToProcess = 0; + + probeSchema = probeBatch.getSchema(); + probeState = ProbeState.PROBE_PROJECT; + + // A special case - if the left was an empty file + if ( leftUpstream == IterOutcome.NONE ){ + changeToFinalProbeState(); + } else { + this.recordsToProcess = probeBatch.getRecordCount(); } - if (hashTable != null) { - hashTable.clear(); + // for those outer partitions that need spilling (cause their matching inners spilled) + // initialize those partitions' current batches and hash-value vectors + for ( HashPartition partn : partitions ) { + partn.allocateNewCurrentBatchAndHV(); + } + + if ( cycleNum > 0 ) { + if ( read_HV_vector != null ) { read_HV_vector.clear();} + if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was empty + read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); + } + } + } + + private void executeProjectRightPhase(int currBuildPart) { + while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { + outputRecords = + container.appendRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed), + null /* no probeBatch */, 0 /* no probe index */ ); + recordsProcessed++; + } + } + + private void executeProbePhase() throws SchemaChangeException { + + while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { + + // Check if we have processed all records in this batch we need to invoke next + if (recordsProcessed == recordsToProcess) { + + // Done processing all records in the previous batch, clean up! + for (VectorWrapper<?> wrapper : probeBatch) { + wrapper.getValueVector().clear(); + } + + IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); + + switch (leftUpstream) { + case NONE: + case NOT_YET: + case STOP: + recordsProcessed = 0; + recordsToProcess = 0; + changeToFinalProbeState(); + // in case some outer partitions were spilled, need to spill their last batches + for ( HashPartition partn : partitions ) { + if ( ! partn.isSpilled() ) { continue; } // skip non-spilled + partn.completeAnOuterBatch(false); + // update the partition's spill record with the outer side + HJSpilledPartition sp = spilledInners[partn.getPartitionNum()]; + sp.outerSpillFile = partn.getSpillFile(); + sp.outerSpilledBatches = partn.getPartitionBatchesCount(); + + partn.closeWriter(); + } + + continue; + + case OK_NEW_SCHEMA: + if (probeBatch.getSchema().equals(probeSchema)) { + for ( HashPartition partn : partitions ) { partn.updateBatches(); } + + } else { + throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.", + probeSchema, + probeBatch.getSchema()); + } + case OK: + recordsToProcess = probeBatch.getRecordCount(); + recordsProcessed = 0; + // If we received an empty batch do nothing + if (recordsToProcess == 0) { + continue; + } + if ( cycleNum > 0 ) { + read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ? + } + } + } + int probeIndex = -1; + // Check if we need to drain the next row in the probe side + if (getNextRecord) { + + if ( !buildSideIsEmpty ) { + int hashCode = ( cycleNum == 0 ) ? + partitions[0].getProbeHashCode(recordsProcessed) + : read_HV_vector.getAccessor().get(recordsProcessed); + int currBuildPart = hashCode & partitionMask ; + hashCode >>>= bitsInMask; + + // Set and keep the current partition (may be used again on subsequent probe calls as + // inner rows of duplicate key are processed) + currPartition = partitions[currBuildPart]; // inner if not spilled, else outer + + // If the matching inner partition was spilled + if ( isSpilledInner(currBuildPart) ) { + // add this row to its outer partition (may cause a spill, when the batch is full) + + currPartition.appendOuterRow(hashCode, recordsProcessed); + + recordsProcessed++; // done with this outer record + continue; // on to the next outer record + } + + probeIndex = currPartition.probeForKey(recordsProcessed, hashCode); + + } + + if (probeIndex != -1) { + + /* The current probe record has a key that matches. Get the index + * of the first row in the build side that matches the current key + * (and record this match in the bitmap, in case of a FULL/RIGHT join) + */ + currentCompositeIdx = currPartition.getStartIndex(probeIndex); + + outputRecords = + container.appendRow(currPartition.getContainers(), currentCompositeIdx, + probeBatch.getContainer(), recordsProcessed); + + /* Projected single row from the build side with matching key but there + * may be more rows with the same key. Check if that's the case + */ + currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); + if (currentCompositeIdx == -1) { + /* We only had one row in the build side that matched the current key + * from the probe side. Drain the next row in the probe side. + */ + recordsProcessed++; + } else { + /* There is more than one row with the same key on the build side + * don't drain more records from the probe side till we have projected + * all the rows with this key + */ + getNextRecord = false; + } + } else { // No matching key + + // If we have a left outer join, project the outer side + if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { + + outputRecords = + container.appendOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition); + } + recordsProcessed++; + } + } + else { // match the next inner row with the same key + + currPartition.setRecordMatched(currentCompositeIdx); + + outputRecords = + container.appendRow(currPartition.getContainers(), currentCompositeIdx, + probeBatch.getContainer(), recordsProcessed); + + currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); + + if (currentCompositeIdx == -1) { + // We don't have any more rows matching the current key on the build side, move on to the next probe row + getNextRecord = true; + recordsProcessed++; + } + } } - super.close(); } /** - * This method checks to see if join processing should be continued further. - * @param upStream up stream operator status. - * @@return true if up stream status is OK or OK_NEW_SCHEMA otherwise false. + * Perform the probe and project the results + * + * @return number of output records + * @throws SchemaChangeException */ - private boolean isFurtherProcessingRequired(IterOutcome upStream) { - return upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA; + private int probeAndProject() throws SchemaChangeException { + + outputRecords = 0; + + // When handling spilled partitions, the state becomes DONE at the end of each partition + if ( probeState == ProbeState.DONE ) { + return outputRecords; // that is zero + } + + if (probeState == ProbeState.PROBE_PROJECT) { + executeProbePhase(); + } + + if (probeState == ProbeState.PROJECT_RIGHT) { + // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join + + do { + + if (unmatchedBuildIndexes == null) { // first time for this partition ? + if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right + // Get this partition's list of build indexes that didn't match any record on the probe side + unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex(); + recordsProcessed = 0; + recordsToProcess = unmatchedBuildIndexes.size(); + } + + // Project the list of unmatched records on the build side + executeProjectRightPhase(currRightPartition); + + if ( recordsProcessed < recordsToProcess ) { // more records in this partition? + return outputRecords; // outgoing is full; report and come back later + } else { + currRightPartition++; // on to the next right partition + unmatchedBuildIndexes = null; + } + + } while ( currRightPartition < numPartitions ); + + probeState = ProbeState.DONE; // last right partition was handled; we are done now + } + + return outputRecords; } -} + + private void changeToFinalProbeState() { + // We are done with the (left) probe phase. + // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side + probeState = + (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT : + ProbeState.DONE; // else we're done + } + +} // public class HashJoinBatch diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java index e8d747e..55146f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -103,6 +103,9 @@ public class HashJoinHelper { public BitSet getKeyMatchBitVector() { return keyMatchBitVector; } + public void clear() { + keyMatchBitVector.clear(); + } } public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException { @@ -231,5 +234,6 @@ public class HashJoinHelper { for (BuildInfo info : buildInfoList) { info.getLinks().clear(); } + buildInfoList.clear(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java deleted file mode 100644 index 36d9baa..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.join; - -import java.io.IOException; - -import org.apache.drill.exec.compile.TemplateClassDefinition; -import org.apache.drill.exec.exception.ClassTransformationException; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.calcite.rel.core.JoinRelType; - -public interface HashJoinProbe { - TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); - - /* The probe side of the hash join can be in the following two states - * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a - * key match and if we do we project the record - * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records - * from the build side that did not match any records on the probe side. For Left outer - * case we handle it internally by projecting the record if there isn't a match on the build side - * 3. DONE: Once we have projected all possible records we are done - */ - enum ProbeState { - PROBE_PROJECT, PROJECT_RIGHT, DONE - } - - void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, - JoinRelType joinRelType, RecordBatch.IterOutcome leftStartState); - void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); - int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; - void projectBuildRecord(int buildIndex, int outIndex); - void projectProbeRecord(int probeIndex, int outIndex); -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java deleted file mode 100644 index 1a85277..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.join; - -import java.io.IOException; -import java.util.List; - -import javax.inject.Named; - -import org.apache.drill.exec.exception.ClassTransformationException; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.calcite.rel.core.JoinRelType; - -public abstract class HashJoinProbeTemplate implements HashJoinProbe { - - // Probe side record batch - private RecordBatch probeBatch; - - private BatchSchema probeSchema; - - private VectorContainer buildBatch; - - // Join type, INNER, LEFT, RIGHT or OUTER - private JoinRelType joinType; - - private HashJoinBatch outgoingJoinBatch = null; - - private static final int TARGET_RECORDS_PER_BATCH = 4000; - - /* Helper class - * Maintains linked list of build side records with the same key - * Keeps information about which build records have a corresponding - * matching key in the probe side (for outer, right joins) - */ - private HashJoinHelper hjHelper = null; - - // Underlying hashtable used by the hash join - private HashTable hashTable = null; - - // Number of records to process on the probe side - private int recordsToProcess = 0; - - // Number of records processed on the probe side - private int recordsProcessed = 0; - - // Number of records in the output container - private int outputRecords; - - // Indicate if we should drain the next record from the probe side - private boolean getNextRecord = true; - - // Contains both batch idx and record idx of the matching record in the build side - private int currentCompositeIdx = -1; - - // Current state the hash join algorithm is in - private ProbeState probeState = ProbeState.PROBE_PROJECT; - - // For outer or right joins, this is a list of unmatched records that needs to be projected - private List<Integer> unmatchedBuildIndexes = null; - - @Override - public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, - HashJoinHelper hjHelper, JoinRelType joinRelType, IterOutcome leftStartState) { - - this.probeBatch = probeBatch; - this.probeSchema = probeBatch.getSchema(); - this.buildBatch = buildBatch; - this.joinType = joinRelType; - this.recordsToProcess = probeRecordCount; - this.hashTable = hashTable; - this.hjHelper = hjHelper; - this.outgoingJoinBatch = outgoing; - - if (leftStartState == IterOutcome.NONE) { - if (joinRelType == JoinRelType.RIGHT) { - probeState = ProbeState.PROJECT_RIGHT; - } else { - probeState = ProbeState.DONE; - } - } - - doSetup(context, buildBatch, probeBatch, outgoing); - } - - public void executeProjectRightPhase() { - while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { - projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords); - recordsProcessed++; - outputRecords++; - } - } - - public void executeProbePhase() throws SchemaChangeException { - while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { - - // Check if we have processed all records in this batch we need to invoke next - if (recordsProcessed == recordsToProcess) { - - // Done processing all records in the previous batch, clean up! - for (VectorWrapper<?> wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - - IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch); - - switch (leftUpstream) { - case NONE: - case NOT_YET: - case STOP: - recordsProcessed = 0; - recordsToProcess = 0; - probeState = ProbeState.DONE; - - // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side - if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) { - probeState = ProbeState.PROJECT_RIGHT; - } - - continue; - - case OK_NEW_SCHEMA: - if (probeBatch.getSchema().equals(probeSchema)) { - doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, outgoingJoinBatch); - if (hashTable != null) { - hashTable.updateBatches(); - } - } else { - throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.", - probeSchema, - probeBatch.getSchema()); - } - case OK: - recordsToProcess = probeBatch.getRecordCount(); - recordsProcessed = 0; - // If we received an empty batch do nothing - if (recordsToProcess == 0) { - continue; - } - } - } - int probeIndex = -1; - - // Check if we need to drain the next row in the probe side - if (getNextRecord) { - if (hashTable != null && !hashTable.isEmpty()) { - probeIndex = hashTable.containsKey(recordsProcessed, true); - } - - if (probeIndex != -1) { - - /* The current probe record has a key that matches. Get the index - * of the first row in the build side that matches the current key - */ - currentCompositeIdx = hjHelper.getStartIndex(probeIndex); - - /* Record in the build side at currentCompositeIdx has a matching record in the probe - * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT - * join we keep track of which records we need to project at the end - */ - hjHelper.setRecordMatched(currentCompositeIdx); - - projectBuildRecord(currentCompositeIdx, outputRecords); - projectProbeRecord(recordsProcessed, outputRecords); - outputRecords++; - /* Projected single row from the build side with matching key but there - * may be more rows with the same key. Check if that's the case - */ - currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx); - if (currentCompositeIdx == -1) { - /* We only had one row in the build side that matched the current key - * from the probe side. Drain the next row in the probe side. - */ - recordsProcessed++; - } else { - /* There is more than one row with the same key on the build side - * don't drain more records from the probe side till we have projected - * all the rows with this key - */ - getNextRecord = false; - } - } else { // No matching key - - // If we have a left outer join, project the keys - if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { - projectProbeRecord(recordsProcessed, outputRecords); - outputRecords++; - } - recordsProcessed++; - } - } else { - hjHelper.setRecordMatched(currentCompositeIdx); - projectBuildRecord(currentCompositeIdx, outputRecords); - projectProbeRecord(recordsProcessed, outputRecords); - outputRecords++; - - currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx); - - if (currentCompositeIdx == -1) { - // We don't have any more rows matching the current key on the build side, move on to the next probe row - getNextRecord = true; - recordsProcessed++; - } - } - } - } - - public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException { - - outputRecords = 0; - - if (probeState == ProbeState.PROBE_PROJECT) { - executeProbePhase(); - } - - if (probeState == ProbeState.PROJECT_RIGHT) { - - // We are here because we have a RIGHT OUTER or a FULL join - if (unmatchedBuildIndexes == null) { - // Initialize list of build indexes that didn't match a record on the probe side - unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex(); - recordsToProcess = unmatchedBuildIndexes.size(); - recordsProcessed = 0; - } - - // Project the list of unmatched records on the build side - executeProjectRightPhase(); - } - - return outputRecords; - } - - public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch, - @Named("outgoing") RecordBatch outgoing); - public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex); - - public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex); - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java index 2f9ab14..d0421f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.cache.VectorSerializer; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; @@ -432,6 +433,10 @@ public class SpillSet { operName = "HashAgg"; spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM); dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS); + } else if (popConfig instanceof HashJoinPOP) { + operName = "HashJoin"; + spillFs = config.getString(ExecConstants.HASHJOIN_SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.HASHJOIN_SPILL_DIRS); } else { // just use the common ones operName = "Unknown"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 9da8a4b..433e0c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -220,6 +220,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + @Override + public VectorContainer getContainer() { + return batchLoader.getContainer(); + } + private void informSenders() { logger.info("Informing senders of request to terminate sending."); final FragmentHandle handlePrototype = FragmentHandle.newBuilder() diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 05eb545..378c980 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -359,6 +359,11 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { this.getClass().getCanonicalName())); } + @Override + public VectorContainer getContainer() { + return incoming.getContainer(); + } + public RecordBatch getIncoming() { return incoming; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 9d383c1..054ceec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -228,4 +228,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + + @Override + public VectorContainer getContainer() { + return container; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index fe7f9e9..ded4351 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -255,6 +255,13 @@ public interface RecordBatch extends VectorAccessible { VectorContainer getOutgoingContainer(); /** + * Return the internal vector container + * + * @return The internal vector container + */ + public VectorContainer getContainer(); + + /** * Gets the value vector type and ID for the given schema path. The * TypedFieldId should store a fieldId which is the same as the ordinal * position of the field within the Iterator provided this class's diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java index 40447be..9dfa129 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java @@ -104,4 +104,7 @@ public class SchemalessBatch implements CloseableRecordBatch { public void close() throws Exception { // This is present to match BatchCreator#getBatch() returning type. } + + @Override + public VectorContainer getContainer() { return null; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java index b6b7b21..4063e55 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java @@ -94,4 +94,9 @@ public class SimpleRecordBatch implements RecordBatch { public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + + @Override + public VectorContainer getContainer() { + return container; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 03e9249..06a89b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.record; import java.lang.reflect.Array; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -42,7 +43,10 @@ public class VectorContainer implements VectorAccessible { private final BufferAllocator allocator; protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList(); private BatchSchema schema; - private int recordCount = -1; + + private int recordCount = 0; + private boolean initialized = false; + // private BufferAllocator allocator; private boolean schemaChanged = true; // Schema has changed since last built. Must rebuild schema public VectorContainer() { @@ -209,6 +213,109 @@ public class VectorContainer implements VectorAccessible { } } + /** + * This works with non-hyper {@link VectorContainer}s which have no selection vectors. + * Appends a row taken from a source {@link VectorContainer} to this {@link VectorContainer}. + * @param srcContainer The {@link VectorContainer} to copy a row from. + * @param srcIndex The index of the row to copy from the source {@link VectorContainer}. + * @return Position where the row was appended + */ + public int appendRow(VectorContainer srcContainer, int srcIndex) { + for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) { + ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); + ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector(); + destVector.copyEntry(recordCount, srcVector, srcIndex); + } + int pos = recordCount++; + initialized = true; + return pos; + } + + /** + * This method currently is only used by the Hash Join to return a row composed of build+probe rows + * + * This works with non-hyper {@link VectorContainer}s which have no selection vectors. + * Appends a row taken from two source {@link VectorContainer}s to this {@link VectorContainer}. + * @param buildSrcContainer The {@link VectorContainer} to copy the first columns of a row from. + * @param buildSrcIndex The index of the row to copy from the build side source {@link VectorContainer}. + * @param probeSrcContainer The {@link VectorContainer} to copy the last columns of a row from. + * @param probeSrcIndex The index of the row to copy from the probe side source {@link VectorContainer}. + * @return Number of records in the container after appending + */ + public int appendRowXXX(VectorContainer buildSrcContainer, int buildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) { + if ( buildSrcContainer != null ) { + for (int vectorIndex = 0; vectorIndex < buildSrcContainer.wrappers.size(); vectorIndex++) { + ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); + ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector(); + destVector.copyEntry(recordCount, srcVector, buildSrcIndex); + } + } + if ( probeSrcContainer != null ) { + int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size(); + for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) { + ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); + ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex).getValueVector(); + destVector.copyEntry(recordCount, srcVector, probeSrcIndex); + } + } + recordCount++; + initialized = true; + return recordCount; + } + + private int appendBuild(VectorContainer buildSrcContainer, int buildSrcIndex) { + // "- 1" to skip the last "hash values" added column + int lastIndex = buildSrcContainer.wrappers.size() - 1 ; + for (int vectorIndex = 0; vectorIndex < lastIndex; vectorIndex++) { + ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); + ValueVector srcVector = buildSrcContainer.wrappers.get(vectorIndex).getValueVector(); + destVector.copyEntry(recordCount, srcVector, buildSrcIndex); + } + return lastIndex; + } + private void appendProbe(VectorContainer probeSrcContainer, int probeSrcIndex, int baseIndex) { + // int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size(); + for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); vectorIndex++) { + ValueVector destVector = wrappers.get(vectorIndex).getValueVector(); + ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex - baseIndex).getValueVector(); + destVector.copyEntry(recordCount, srcVector, probeSrcIndex); + } + } + /** + * A special version of appendRow for the HashJoin; uses a composite index for the build side + * @param buildSrcContainers The containers list for the right side + * @param compositeBuildSrcIndex Composite build index + * @param probeSrcContainer The single container for the left/outer side + * @param probeSrcIndex Index in the outer container + * @return Number of rows in this container (after the append) + */ + public int appendRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) { + int buildBatch = compositeBuildSrcIndex >>> 16; + int buildOffset = compositeBuildSrcIndex & 65535; + int baseInd = 0; + if ( buildSrcContainers != null ) { baseInd = appendBuild(buildSrcContainers.get(buildBatch), buildOffset); } + if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex, baseInd); } + recordCount++; + initialized = true; + return recordCount; + } + + /** + * A customised version of the special appendRow for HashJoin - used for Left + * Outer Join when there is no build side match - hence need a base index in + * this container's wrappers from where to start appending + * @param probeSrcContainer + * @param probeSrcIndex + * @param baseInd - index of this container's wrapper to start at + * @return + */ + public int appendOuterRow(VectorContainer probeSrcContainer, int probeSrcIndex, int baseInd) { + appendProbe(probeSrcContainer, probeSrcIndex, baseInd); + recordCount++; + initialized = true; + return recordCount; + } + public TypedFieldId add(ValueVector vv) { schemaChanged = true; schema = null; @@ -217,6 +324,11 @@ public class VectorContainer implements VectorAccessible { return new TypedFieldId(vv.getField().getType(), i); } + public ValueVector getLast() { + int sz = wrappers.size(); + if ( sz == 0 ) { return null; } + return wrappers.get(sz - 1).getValueVector(); + } public void add(ValueVector[] hyperVector) { add(hyperVector, true); } @@ -343,7 +455,8 @@ public class VectorContainer implements VectorAccessible { } public void setRecordCount(int recordCount) { - this.recordCount = recordCount; + this.recordCount = recordCount; + initialized = true; } @Override @@ -352,7 +465,7 @@ public class VectorContainer implements VectorAccessible { return recordCount; } - public boolean hasRecordCount() { return recordCount != -1; } + public boolean hasRecordCount() { return initialized; } @Override public SelectionVector2 getSelectionVector2() { @@ -418,6 +531,7 @@ public class VectorContainer implements VectorAccessible { merged.wrappers.addAll(wrappers); merged.wrappers.addAll(otherContainer.wrappers); merged.schemaChanged = false; + merged.initialized = true; return merged; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 7f02773..810fd37 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -117,6 +117,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(PlannerSettings.JOIN_OPTIMIZATION), new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL), new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing + new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR), + new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR), + new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR), + new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR), + new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR), new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR), new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR), new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index b2013d5..f321691 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -244,6 +244,17 @@ drill.exec: { // if they do not exist. directories: [ "/tmp/drill/spill" ] }, + hashjoin: { + spill: { + // -- The 2 options below can be used to override the common ones + // -- (common to all spilling operators) + // File system to use. Local file system by default. + fs: ${drill.exec.spill.fs}, + // List of directories to use. Directories are created + // if they do not exist. + directories: ${drill.exec.spill.directories}, + } + }, hashagg: { spill: { // -- The 2 options below can be used to override the common ones @@ -427,9 +438,16 @@ drill.exec.options: { exec.enable_bulk_load_table_list: false, exec.enable_union_type: false, exec.errors.verbose: false, + exec.hashjoin.mem_limit: 0, + exec.hashjoin.num_partitions: 32, + exec.hashjoin.num_rows_in_batch: 1024, + exec.hashjoin.max_batches_in_memory: 128, + exec.hashjoin.max_batches_per_partition: 512, exec.hashagg.mem_limit: 0, exec.hashagg.min_batches_per_partition: 2, exec.hashagg.num_partitions: 32, + exec.hashagg.num_rows_in_batch: 128, + exec.hashagg.max_batches_in_memory: 65536, exec.hashagg.use_memory_prediction: true, exec.impersonation.inbound_policies: "[]", exec.java.compiler.exp_in_method_size: 50, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java new file mode 100644 index 0000000..4cfe6b4 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.join; + +import com.google.common.collect.Lists; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.categories.OperatorTest; +import org.apache.drill.categories.SlowTest; + +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; + +@Category({SlowTest.class, OperatorTest.class}) +public class TestHashJoinSpill extends PhysicalOpUnitTestBase { + + + @SuppressWarnings("unchecked") + @Test + // Should spill, including recursive spill + public void testSimpleHashJoinSpill() { + HashJoinPOP joinConf = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch", 64); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory", 8); + // Put some duplicate values + List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a string\"}]", + "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : \"yet another\"}]"); + List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a string\"}]", + "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : \"yet another\"}]"); + int numRows = 2_500; + for ( int cnt = 1; cnt <= numRows; cnt++ ) { + leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]"); + rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]"); + } + + opTestBuilder() + .physicalOperator(joinConf) + .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable)) + .baselineColumns("lft", "a", "b", "rgt") + .expectedTotalRows( numRows + 9 ) + .go(); + } + + @SuppressWarnings("unchecked") + @Test + public void testRightOuterHashJoinSpill() { + HashJoinPOP joinConf = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.RIGHT); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch", 64); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory", 8); + // Put some duplicate values + List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a string\"}]", + "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : \"yet another\"}]"); + List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a string\"}]", + "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : \"yet another\"}]"); + int numRows = 8_000; + for ( int cnt = 1; cnt <= numRows; cnt++ ) { + // leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]"); + rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]"); + } + + opTestBuilder() + .physicalOperator(joinConf) + .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable)) + .baselineColumns("lft", "a", "b", "rgt") + .expectedTotalRows( numRows + 9 ) + .go(); + } + + @SuppressWarnings("unchecked") + @Test + public void testLeftOuterHashJoinSpill() { + HashJoinPOP joinConf = new HashJoinPOP(null, null, + Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.LEFT); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 8); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch", 64); + operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory", 12); + // Put some duplicate values + List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a string\"}]", + "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : \"yet another\"}]"); + List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a string\"}]", + "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : \"yet another\"}]"); + int numRows = 4_000; // 100_000 + for ( int cnt = 1; cnt <= numRows / 2 ; cnt++ ) { // inner use only half, to check the left-outer join + // leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]"); + rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]"); + } + for ( int cnt = 1; cnt <= numRows; cnt++ ) { + leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]"); + // rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]"); + } + + opTestBuilder() + .physicalOperator(joinConf) + .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable)) + .baselineColumns("lft", "a", "b", "rgt") + .expectedTotalRows( numRows + 9 ) + .go(); + } +} \ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index f45f558..6374f1f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -151,7 +151,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { } Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>(); - int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors); + int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors, null); if (expectBatchNum != null) { if (expectBatchNum != actualBatchNum) { throw new AssertionError(String.format("Expected %s batches from operator tree. But operators return %s batch!", expectBatchNum, actualBatchNum)); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java index 4bab883..16081be 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -216,6 +216,7 @@ public class PhysicalOpUnitTestBase extends ExecTest { private boolean expectNoRows; private Long expectedBatchSize; private Integer expectedNumBatches; + private Integer expectedTotalRows; @SuppressWarnings({"unchecked", "resource"}) public void go() { @@ -235,7 +236,8 @@ public class PhysicalOpUnitTestBase extends ExecTest { testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams); - Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches); + Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches, expectedTotalRows); + if ( expectedTotalRows != null ) { return; } // when checking total rows, don't compare actual results Map<String, List<Object>> expectedSuperVectors; @@ -328,6 +330,11 @@ public class PhysicalOpUnitTestBase extends ExecTest { this.expectedBatchSize = batchSize; return this; } + + public OperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) { + this.expectedTotalRows = expectedTotalRows; + return this; + } } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java index b5450e6..57bc79c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java @@ -315,15 +315,16 @@ public class DrillTestWrapper { * Iterate over batches, and combine the batches into a map, where key is schema path, and value is * the list of column values across all the batches. * @param batches + * @param expectedTotalRecords * @return * @throws SchemaChangeException * @throws UnsupportedEncodingException */ public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, - Long expectedBatchSize, Integer expectedNumBatches) + Long expectedBatchSize, Integer expectedNumBatches, Integer expectedTotalRecords) throws SchemaChangeException, UnsupportedEncodingException { Map<String, List<Object>> combinedVectors = new TreeMap<>(); - addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors); + addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors, expectedTotalRecords); return combinedVectors; } @@ -340,7 +341,7 @@ public class DrillTestWrapper { */ public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Long expectedBatchSize, Integer expectedNumBatches, - Map<String, List<Object>> combinedVectors) + Map<String, List<Object>> combinedVectors, Integer expectedTotalRecords) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes int numBatch = 0; @@ -443,6 +444,9 @@ public class DrillTestWrapper { Assert.assertTrue(numBatch <= (2*expectedNumBatches)); } + if ( expectedTotalRecords != null ) { + Assert.assertEquals(expectedTotalRecords.longValue(), totalRecords); + } return numBatch; } @@ -562,7 +566,7 @@ public class DrillTestWrapper { addTypeInfoIfMissing(actual.get(0), testBuilder); BatchIterator batchIter = new BatchIterator(actual, loader); - actualSuperVectors = addToCombinedVectorResults(batchIter, null, null); + actualSuperVectors = addToCombinedVectorResults(batchIter, null, null, null); batchIter.close(); // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes @@ -575,7 +579,7 @@ public class DrillTestWrapper { test(baselineOptionSettingQueries); expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery()); BatchIterator exBatchIter = new BatchIterator(expected, loader); - expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null); + expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null, null); exBatchIter.close(); } } else { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java index 02156f6..a3cd918 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java @@ -105,4 +105,7 @@ public class RowSetBatch implements RecordBatch { public Iterator<VectorWrapper<?>> iterator() { return rowSet.container().iterator(); } + + @Override + public VectorContainer getContainer() { return rowSet.container(); } } diff --git a/exec/java-exec/src/test/resources/empty.json b/exec/java-exec/src/test/resources/empty.json new file mode 100644 index 0000000..e69de29 -- To stop receiving notification emails like this one, please contact b...@apache.org.