This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 13e72c1ed846a4c2a5ee18129e773a3e2c215cdc
Author: Ben-Zvi <bben-...@mapr.com>
AuthorDate: Wed Feb 14 12:36:39 2018 -0800

    DRILL-6027: Initial implementation of HashJoin spill, without memory limits 
checks yet
---
 .../test/java/org/apache/drill/test/DrillTest.java |   2 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |  14 +
 .../apache/drill/exec/cache/VectorSerializer.java  |   3 +-
 .../drill/exec/physical/impl/BaseRootExec.java     |   7 +-
 .../apache/drill/exec/physical/impl/ScanBatch.java |   4 +
 .../physical/impl/aggregate/HashAggTemplate.java   |  19 +-
 .../impl/aggregate/SpilledRecordbatch.java         |  11 +-
 .../physical/impl/common/ChainedHashTable.java     |  14 +-
 .../exec/physical/impl/common/HashPartition.java   | 500 +++++++++++
 .../drill/exec/physical/impl/common/HashTable.java |  10 +-
 .../physical/impl/common/HashTableTemplate.java    |  59 +-
 .../exec/physical/impl/join/HashJoinBatch.java     | 945 +++++++++++++++------
 .../exec/physical/impl/join/HashJoinHelper.java    |   4 +
 .../exec/physical/impl/join/HashJoinProbe.java     |  53 --
 .../physical/impl/join/HashJoinProbeTemplate.java  | 261 ------
 .../drill/exec/physical/impl/spill/SpillSet.java   |   5 +
 .../unorderedreceiver/UnorderedReceiverBatch.java  |   5 +
 .../validate/IteratorValidatorBatchIterator.java   |   5 +
 .../drill/exec/record/AbstractRecordBatch.java     |   5 +
 .../org/apache/drill/exec/record/RecordBatch.java  |   7 +
 .../apache/drill/exec/record/SchemalessBatch.java  |   3 +
 .../drill/exec/record/SimpleRecordBatch.java       |   5 +
 .../apache/drill/exec/record/VectorContainer.java  | 120 ++-
 .../exec/server/options/SystemOptionManager.java   |   5 +
 .../java-exec/src/main/resources/drill-module.conf |  18 +
 .../exec/physical/impl/join/TestHashJoinSpill.java | 123 +++
 .../exec/physical/unit/MiniPlanUnitTestBase.java   |   2 +-
 .../exec/physical/unit/PhysicalOpUnitTestBase.java |   9 +-
 .../org/apache/drill/test/DrillTestWrapper.java    |  14 +-
 .../org/apache/drill/test/rowSet/RowSetBatch.java  |   3 +
 exec/java-exec/src/test/resources/empty.json       |   0
 31 files changed, 1612 insertions(+), 623 deletions(-)

diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java 
b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 24ec381..28544ab 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -56,7 +56,7 @@ public class DrillTest {
   static MemWatcher memWatcher;
   static String className;
 
-  @Rule public final TestRule TIMEOUT = new 
DisableOnDebug(TestTools.getTimeoutRule(100_000));
+  @Rule public final TestRule TIMEOUT = new 
DisableOnDebug(TestTools.getTimeoutRule(1_000_000));
   @Rule public final TestLogReporter logOutcome = LOG_OUTCOME;
 
   @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 671ce4b..57ecdf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -103,6 +103,20 @@ public final class ExecConstants {
 
   public static final BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = 
new BooleanValidator("exec.sort.disable_managed");
 
+  // Hash Join Options
+  public static final String HASHJOIN_NUM_ROWS_IN_BATCH_KEY = 
"exec.hashjoin.num_rows_in_batch";
+  public static final LongValidator HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR = new 
RangeLongValidator(HASHJOIN_NUM_ROWS_IN_BATCH_KEY, 1, 65536);
+  public static final String HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY = 
"exec.hashjoin.max_batches_in_memory";
+  public static final LongValidator HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR = 
new RangeLongValidator(HASHJOIN_MAX_BATCHES_IN_MEMORY_KEY, 1, 65536);
+  public static final String HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY = 
"exec.hashjoin.max_batches_per_partition";
+  public static final LongValidator 
HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR = new 
RangeLongValidator(HASHJOIN_MAX_BATCHES_PER_PARTITION_KEY, 1, 65536);
+  public static final String HASHJOIN_NUM_PARTITIONS_KEY = 
"exec.hashjoin.num_partitions";
+  public static final LongValidator HASHJOIN_NUM_PARTITIONS_VALIDATOR = new 
RangeLongValidator(HASHJOIN_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no 
spilling
+  public static final String HASHJOIN_MAX_MEMORY_KEY = 
"exec.hashjoin.mem_limit";
+  public static final LongValidator HASHJOIN_MAX_MEMORY_VALIDATOR = new 
RangeLongValidator(HASHJOIN_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
+  public static final String HASHJOIN_SPILL_DIRS = 
"drill.exec.hashjoin.spill.directories";
+  public static final String HASHJOIN_SPILL_FILESYSTEM = 
"drill.exec.hashjoin.spill.fs";
+
   // Hash Aggregate Options
   public static final String HASHAGG_NUM_PARTITIONS_KEY = 
"exec.hashagg.num_partitions";
   public static final LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new 
RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
index 03ea11e..b5beb62 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
@@ -91,7 +91,7 @@ public class VectorSerializer {
 
       final DrillBuf[] incomingBuffers = batch.getBuffers();
       final UserBitShared.RecordBatchDef batchDef = batch.getDef();
-      bytesWritten = batchDef.getSerializedSize();
+      int bytesWritten = batchDef.getSerializedSize();
 
       /* Write the metadata to the file */
       batchDef.writeDelimitedTo(output);
@@ -115,6 +115,7 @@ public class VectorSerializer {
       }
 
       timeNs += timerContext.stop();
+      this.bytesWritten += bytesWritten;
       return bytesWritten;
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 76fd642..b18a78e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -139,12 +139,7 @@ public abstract class BaseRootExec implements RootExec {
 
     // close all operators.
     if (operators != null) {
-      final DeferredException df = new DeferredException(new 
Supplier<Exception>() {
-        @Override
-        public Exception get() {
-          return new RuntimeException("Error closing operators");
-        }
-      });
+      final DeferredException df = new DeferredException();
 
       for (final CloseableRecordBatch crb : operators) {
         df.suppressingClose(crb);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ea943b2..4a62752 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -487,6 +487,10 @@ public class ScanBatch implements CloseableRecordBatch {
                       this.getClass().getCanonicalName()));
   }
 
+  @Override
+  public VectorContainer getContainer() {
+    return container;
+  }
   /**
    * Verify list of implicit column values is valid input:
    *   - Either implicit column list is empty;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 4c54080..368fd2c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -179,7 +179,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     NUM_RESIZING,
     RESIZING_TIME_MS,
     NUM_PARTITIONS,
-    SPILLED_PARTITIONS, // number of partitions spilled to disk
+    SPILLED_PARTITIONS, // number of original partitions spilled to disk
     SPILL_MB,         // Number of MB of data spilled to disk. This amount is 
first written,
                       // then later re-read. So, disk I/O is twice this amount.
                       // For first phase aggr -- this is an estimate of the 
amount of data
@@ -187,8 +187,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
     ;
 
-    // duplicate for hash ag
-
     @Override
     public int metricId() {
       return ordinal();
@@ -201,8 +199,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private int maxOccupiedIdx = -1;
     private int batchOutputCount = 0;
 
-    private int capacity = Integer.MAX_VALUE;
-
     @SuppressWarnings("resource")
     public BatchHolder() {
 
@@ -233,8 +229,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
             vector.allocateNew();
           }
 
-          capacity = Math.min(capacity, vector.getValueCapacity());
-
           aggrValuesContainer.add(vector);
         }
         success = true;
@@ -464,7 +458,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     // initialize every (per partition) entry in the arrays
     for (int i = 0; i < numPartitions; i++ ) {
       try {
-        this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+        this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds);
         this.htables[i].setMaxVarcharSize(maxColumnWidth);
       } catch (ClassTransformationException e) {
         throw UserException.unsupportedError(e)
@@ -490,12 +484,13 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   public RecordBatch getNewIncoming() { return newIncoming; }
 
   private void initializeSetup(RecordBatch newIncoming) throws 
SchemaChangeException, IOException {
-    baseHashTable.updateIncoming(newIncoming); // after a spill - a new 
incoming
+    baseHashTable.updateIncoming(newIncoming, null); // after a spill - a new 
incoming
     this.incoming = newIncoming;
     currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in 
this spill file
     nextPartitionToReturn = 0;
     for (int i = 0; i < numPartitions; i++ ) {
-      htables[i].reinit(newIncoming);
+      htables[i].updateIncoming(newIncoming.getContainer(), null);
+      htables[i].reset();
       if ( batchHolders[i] != null) {
         for (BatchHolder bh : batchHolders[i]) {
           bh.clear();
@@ -547,7 +542,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
     // multiply by the max number of rows in a batch to get the final 
estimated max size
     estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
-    // (When there are no aggr functions, use '1' as later code relies on this 
size being non-zero)
+    // (When there are no aggr functions, use '1' as later code relies on this 
siisDebze being non-zero)
     estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
     estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
 
@@ -1260,7 +1255,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     int hashCode;
     try {
       // htables[0].updateBatches();
-      hashCode = htables[0].getHashCode(incomingRowIdx);
+      hashCode = htables[0].getBuildHashCode(incomingRowIdx);
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException("Unexpected schema change", e);
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index c473b94..c78e2c0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -48,6 +48,7 @@ public class SpilledRecordbatch implements 
CloseableRecordBatch {
   private SpillSet spillSet;
   private String spillFile;
   VectorAccessibleSerializable vas;
+  private IterOutcome initialOutcome;
 
   public SpilledRecordbatch(String spillFile, int spilledBatches, 
FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet 
spillSet) {
     this.context = context;
@@ -64,7 +65,7 @@ public class SpilledRecordbatch implements 
CloseableRecordBatch {
       throw UserException.resourceError(e).build(HashAggBatch.logger);
     }
 
-    next(); // initialize the container
+    initialOutcome = next(); // initialize the container
   }
 
   @Override
@@ -107,6 +108,9 @@ public class SpilledRecordbatch implements 
CloseableRecordBatch {
   public VectorContainer getOutgoingContainer() { return container; }
 
   @Override
+  public VectorContainer getContainer() { return container; }
+
+  @Override
   public int getRecordCount() { return container.getRecordCount(); }
 
   @Override
@@ -156,6 +160,11 @@ public class SpilledRecordbatch implements 
CloseableRecordBatch {
   }
 
   /**
+   *  Return the initial outcome (from the first next() call )
+   */
+  public IterOutcome getInitialOutcome() { return initialOutcome; }
+
+  /**
    * Note: ignoring any IO errors (e.g. file not found)
    */
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index e061bdd..89e32a8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -116,7 +116,7 @@ public class ChainedHashTable {
   private final FragmentContext context;
   private final BufferAllocator allocator;
   private RecordBatch incomingBuild;
-  private final RecordBatch incomingProbe;
+  private RecordBatch incomingProbe;
   private final RecordBatch outgoing;
 
   public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, 
BufferAllocator allocator,
@@ -130,11 +130,12 @@ public class ChainedHashTable {
     this.outgoing = outgoing;
   }
 
-  public void updateIncoming(RecordBatch incomingBuild) {
+  public void updateIncoming(RecordBatch incomingBuild, RecordBatch 
incomingProbe) {
     this.incomingBuild = incomingBuild;
+    this.incomingProbe = incomingProbe;
   }
 
-  public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int 
numPartitions) throws ClassTransformationException,
+  public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) 
throws ClassTransformationException,
       IOException, SchemaChangeException {
     CodeGenerator<HashTable> top = 
CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getOptions());
     top.plainJavaCapable(true);
@@ -225,14 +226,13 @@ public class ChainedHashTable {
     setupGetHash(cg /* use top level code generator for getHash */, 
GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
 
     HashTable ht = context.getImplementationClass(top);
-    ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, 
htContainerOrig);
+    ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, 
outgoing, htContainerOrig);
 
     return ht;
   }
 
   private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, 
MappingSet incomingMapping, MappingSet htableMapping,
-      LogicalExpression[] keyExprs, List<Comparator> comparators, 
TypedFieldId[] htKeyFieldIds)
-      throws SchemaChangeException {
+      LogicalExpression[] keyExprs, List<Comparator> comparators, 
TypedFieldId[] htKeyFieldIds) {
     cg.setMappingSet(incomingMapping);
 
     if (keyExprs == null || keyExprs.length == 0) {
@@ -276,7 +276,7 @@ public class ChainedHashTable {
   }
 
   private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] 
keyExprs,
-                             TypedFieldId[] htKeyFieldIds) throws 
SchemaChangeException {
+                             TypedFieldId[] htKeyFieldIds) {
 
     cg.setMappingSet(SetValueMapping);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
new file mode 100644
index 0000000..5b4adf1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
+import org.apache.drill.exec.physical.impl.join.HashJoinHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ObjectVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *  The class HashPartition
+ *
+ *    Created to represent an active partition for the Hash-Join operator
+ *  (active means: currently receiving data, or its data is being probed; as 
opposed to fully
+ *   spilled partitions).
+ *    After all the build/iner data is read for this partition - if all its 
data is in memory, then
+ *  a hash table and a helper are created, and later this data would be probed.
+ *    If all this partition's build/inner data was spilled, then it begins to 
work as an outer
+ *  partition (see the flag "processingOuter") -- reusing some of the fields 
(e.g., currentBatch,
+ *  currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
+ */
+public class HashPartition {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashPartition.class);
+
+  private int partitionNum = -1; // the current number of this partition, as 
used by the operator
+
+  private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
+  private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control 
memory allocation for varchars
+
+  private final MajorType HVtype = MajorType.newBuilder()
+    .setMinorType(MinorType.INT /* dataType */ )
+    .setMode(DataMode.REQUIRED /* mode */ )
+    .build();
+
+  // The vector containers storing all the inner rows
+  // * Records are retrieved from these containers when there is a matching 
record
+  // * on the probe side
+  private ArrayList<VectorContainer> containers;
+
+  // While build data is incoming - temporarily keep the list of in-memory
+  // incoming batches, per each partition (these may be spilled at some point)
+  private List<VectorContainer> tmpBatchesList;
+  // A batch and HV vector to hold incoming rows - per each partition
+  private VectorContainer currentBatch; // The current (newest) batch
+  private IntVector currHVVector; // The HV vectors for the currentBatches
+
+  /* Helper class
+   * Maintains linked list of build side records with the same key
+   * Keeps information about which build records have a corresponding
+   * matching key in the probe side (for outer, right joins)
+   */
+  private HashJoinHelper hjHelper;
+
+  // Underlying hashtable used by the hash join
+  private HashTable hashTable;
+
+  private VectorSerializer.Writer writer; // a vector writer for each spilled 
partition
+  private int partitionBatchesCount; // count number of batches spilled
+  private String spillFile;
+
+  private BufferAllocator allocator;
+  private FragmentContext context;
+  private int RECORDS_PER_BATCH;
+  ChainedHashTable baseHashTable;
+  private SpillSet spillSet;
+  private boolean isSpilled; // is this partition spilled ?
+  private boolean processingOuter; // is (inner done spilling and) now the 
outer is processed?
+  private boolean outerBatchNotNeeded; // when the inner is whole in memory
+  private RecordBatch buildBatch;
+  private RecordBatch probeBatch;
+  private HashJoinBatch.inMemBatchCounter inMemBatches; // shared among all 
partitions
+  private int cycleNum;
+
+  public HashPartition(FragmentContext context, BufferAllocator allocator, 
ChainedHashTable baseHashTable,
+                       RecordBatch buildBatch, RecordBatch probeBatch,
+                       int recordsPerBatch, SpillSet spillSet, int partNum,
+                       HashJoinBatch.inMemBatchCounter inMemBatches, int 
cycleNum) {
+    this.context = context;
+    this.allocator = allocator;
+    this.baseHashTable = baseHashTable;
+    this.buildBatch = buildBatch;
+    this.probeBatch = probeBatch;
+    this.RECORDS_PER_BATCH = recordsPerBatch;
+    this.spillSet = spillSet;
+    this.partitionNum = partNum;
+    this.inMemBatches = inMemBatches;
+    this.cycleNum = cycleNum;
+
+    try {
+      this.hashTable = baseHashTable.createAndSetupHashTable(null);
+      this.hashTable.setMaxVarcharSize(maxColumnWidth);
+    } catch (ClassTransformationException e) {
+      throw UserException.unsupportedError(e)
+        .message("Code generation error - likely an error in the code.")
+        .build(logger);
+    } catch (IOException e) {
+      throw UserException.resourceError(e)
+        .message("IO Error while creating a hash table.")
+        .build(logger);
+    } catch (SchemaChangeException sce) {
+      throw new IllegalStateException("Unexpected Schema Change while creating 
a hash table",sce);
+    }
+    this.hjHelper = new HashJoinHelper(context, allocator);
+    tmpBatchesList = new ArrayList<>();
+    allocateNewCurrentBatchAndHV();
+  }
+
+  /**
+   * Allocate a new vector container for either right or left record batch
+   * Add an additional special vector for the hash values
+   * Note: this call may OOM !!
+   * @param rb - either the right or the left record batch
+   * @return the new vector container
+   */
+  private VectorContainer allocateNewVectorContainer(RecordBatch rb) {
+    VectorContainer newVC = new VectorContainer();
+    VectorContainer fromVC = rb.getContainer();
+    Iterator<VectorWrapper<?>> vci = fromVC.iterator();
+    boolean success = false;
+
+    try {
+      while (vci.hasNext()) {
+        VectorWrapper vw = vci.next();
+        // If processing a spilled container, skip the last column (HV)
+        if ( cycleNum > 0 && ! vci.hasNext() ) { break; }
+        ValueVector vv = vw.getValueVector();
+        ValueVector newVV = TypeHelper.getNewVector(vv.getField(), allocator);
+        newVC.add(newVV); // add first to allow dealloc in case of an OOM
+
+        if (newVV instanceof FixedWidthVector) {
+          ((FixedWidthVector) newVV).allocateNew(RECORDS_PER_BATCH);
+        } else if (newVV instanceof VariableWidthVector) {
+          // Need to check - (is this case ever used?) if a varchar falls 
under ObjectVector which is allocated on the heap !
+          ((VariableWidthVector) newVV).allocateNew(maxColumnWidth * 
RECORDS_PER_BATCH, RECORDS_PER_BATCH);
+        } else if (newVV instanceof ObjectVector) {
+          ((ObjectVector) newVV).allocateNew(RECORDS_PER_BATCH);
+        } else {
+          newVV.allocateNew();
+        }
+      }
+
+      newVC.setRecordCount(0);
+      inMemBatches.inc(); ; // one more batch in memory
+      success = true;
+    } finally {
+      if ( !success ) {
+        newVC.clear(); // in case of an OOM
+      }
+    }
+    return newVC;
+  }
+
+  /**
+   *  Allocate a new current Vector Container and current HV vector
+   */
+  public void allocateNewCurrentBatchAndHV() {
+    if ( outerBatchNotNeeded ) { return; } // skip when the inner is whole in 
memory
+    currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : 
buildBatch);
+    currHVVector = new IntVector(MaterializedField.create("Hash_Values", 
HVtype), allocator);
+    currHVVector.allocateNew(RECORDS_PER_BATCH);
+  }
+
+  /**
+   *  Spills if needed
+   */
+  public void appendInnerRow(VectorContainer buildContainer, int ind, int 
hashCode, boolean needsSpill) {
+
+    int pos = currentBatch.appendRow(buildContainer,ind);
+    currHVVector.getMutator().set(pos, hashCode);   // store the hash value in 
the new column
+    if ( pos + 1 == RECORDS_PER_BATCH ) {
+      completeAnInnerBatch(true, needsSpill);
+    }
+  }
+
+  /**
+   *  Outer always spills when batch is full
+   *
+   */
+  public void appendOuterRow(int hashCode, int recordsProcessed) {
+    int pos = 
currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
+    currHVVector.getMutator().set(pos, hashCode);   // store the hash value in 
the new column
+    if ( pos + 1 == RECORDS_PER_BATCH ) {
+      completeAnOuterBatch(true);
+    }
+  }
+
+  public void completeAnOuterBatch(boolean toInitialize) {
+    completeABatch(toInitialize, true);
+  }
+  public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
+    completeABatch(toInitialize, needsSpill);
+  }
+  /**
+   *     A current batch is full (or no more rows incoming) - complete 
processing this batch
+   * I.e., add it to its partition's tmp list, if needed - spill that list, 
and if needed -
+   * (that is, more rows are coming) - initialize with a new current batch for 
that partition
+   * */
+  private void completeABatch(boolean toInitialize, boolean needsSpill) {
+    if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
+      currentBatch.add(currHVVector);
+      currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      tmpBatchesList.add(currentBatch);
+      partitionBatchesCount++;
+    } else {
+      freeCurrentBatchAndHVVector();
+    }
+    if ( needsSpill ) { // spill this batch/partition and free its memory
+      spillThisPartition(tmpBatchesList, processingOuter ? "outer" : "inner");
+    }
+    if ( toInitialize ) { // allocate a new batch and HV vector
+      allocateNewCurrentBatchAndHV();
+    } else {
+      currentBatch = null;
+      currHVVector = null;
+    }
+  }
+
+  private void spillThisPartition(List<VectorContainer> vcList, String side) {
+    if ( vcList.size() == 0 ) { return; } // in case empty - nothing to spill
+    logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size 
{} batches", partitionNum, cycleNum, vcList.size());
+
+    // If this is the first spill for this partition, create an output stream
+    if ( writer == null ) {
+      // A special case - when (outer is) empty
+      if ( vcList.get(0).getRecordCount() == 0 ) {
+        VectorContainer vc = vcList.remove(0);
+        inMemBatches.dec();
+        vc.zeroVectors();
+        return;
+      }
+      String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : 
side;
+      spillFile = spillSet.getNextSpillFile(suffix);
+
+      try {
+        writer = spillSet.writer(spillFile);
+      } catch (IOException ioe) {
+        throw UserException.resourceError(ioe)
+          .message("Hash Join failed to open spill file: " + spillFile)
+          .build(logger);
+      }
+
+      isSpilled = true;
+    }
+
+    while ( vcList.size() > 0 ) {
+      VectorContainer vc = vcList.remove(0);
+      inMemBatches.dec();
+
+      int numRecords = vc.getRecordCount();
+      if (numRecords == 0) { // Spilling should to skip an empty batch
+        vc.zeroVectors();
+        continue;
+      }
+
+      // set the value count for outgoing batch value vectors
+      for (VectorWrapper<?> v : vc) {
+        v.getValueVector().getMutator().setValueCount(numRecords);
+      }
+
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numRecords, vc, 
false);
+      try {
+        writer.write(batch, null);
+      } catch (IOException ioe) {
+        throw UserException.dataWriteError(ioe)
+          .message("Hash Join failed to write to output file: " + spillFile)
+          .build(logger);
+      } finally {
+        batch.clear();
+      }
+      vc.zeroVectors();
+      logger.trace("HASH JOIN: Took {} us to spill {} records", 
writer.time(TimeUnit.MICROSECONDS), numRecords);
+
+    }
+  }
+
+  //
+  // ===== Methods to probe the hash table and to get indices out of the 
helper =======
+  //
+
+  public int probeForKey(int recordsProcessed, int hashCode) throws 
SchemaChangeException {
+    return hashTable.probeForKey(recordsProcessed, hashCode);
+  }
+  public int getStartIndex(int probeIndex) {
+    /* The current probe record has a key that matches. Get the index
+     * of the first row in the build side that matches the current key
+     */
+    int compositeIndex = hjHelper.getStartIndex(probeIndex);
+    /* Record in the build side at currentCompositeIdx has a matching record 
in the probe
+     * side. Set the bit corresponding to this index so if we are doing a FULL 
or RIGHT
+     * join we keep track of which records we need to project at the end
+     */
+    hjHelper.setRecordMatched(compositeIndex);
+    return compositeIndex;
+  }
+  public int getNextIndex(int compositeIndex) {
+    // in case of iner rows with duplicate keys, get the next one
+    return hjHelper.getNextIndex(compositeIndex);
+  }
+  public void setRecordMatched(int compositeIndex) {
+    hjHelper.setRecordMatched(compositeIndex);
+  }
+  public List<Integer> getNextUnmatchedIndex() {
+    return hjHelper.getNextUnmatchedIndex();
+  }
+  //
+  // 
=====================================================================================
+  //
+
+  public int getBuildHashCode(int ind) throws SchemaChangeException {
+    return hashTable.getBuildHashCode(ind);
+  }
+  public int getProbeHashCode(int ind) throws SchemaChangeException {
+    return hashTable.getProbeHashCode(ind);
+  }
+  public ArrayList<VectorContainer> getContainers() {
+    return containers;
+  }
+
+  public void updateBatches() throws SchemaChangeException {
+    hashTable.updateBatches();
+  }
+  public boolean isSpilled() {
+    return isSpilled;
+  }
+  public String getSpillFile() {
+    return spillFile;
+  }
+
+  public int getPartitionBatchesCount() {
+    return partitionBatchesCount;
+  }
+  public int getPartitionNum() {
+    return partitionNum;
+  }
+
+  private void freeCurrentBatchAndHVVector() {
+    if ( currentBatch != null ) {
+      inMemBatches.dec();
+      currentBatch.clear();
+      currentBatch = null;
+    }
+    if ( currHVVector != null ) {
+      currHVVector.clear();
+      currHVVector = null;
+    }
+  }
+
+  public void closeWriterAndDeleteFile() {
+    closeWriterInternal(true);
+  }
+  public void closeWriter() { // no deletion !!
+    closeWriterInternal(false);
+    processingOuter = true; // After the spill file was closed
+  }
+  /**
+   * If exists - close the writer for this partition
+   *
+   * @param doDeleteFile Also delete the associated file
+   */
+  private void closeWriterInternal(boolean doDeleteFile) {
+    try {
+      if ( writer != null ) {
+        spillSet.close(writer);
+      }
+      if ( doDeleteFile && spillFile != null ) {
+        spillSet.delete(spillFile);
+      }
+    } catch (IOException ioe) {
+      throw UserException.resourceError(ioe)
+        .message("IO Error while closing %s spill file %s",
+          doDeleteFile ? "and deleting" : "",
+          spillFile)
+        .build(logger);
+    }
+    spillFile = null;
+    writer = null;
+    partitionBatchesCount = 0;
+  }
+
+  /**
+   *
+   */
+  public void buildContainersHashTableAndHelper() throws SchemaChangeException 
{
+    if ( isSpilled ) { return; } // no building for spilled partitions
+    containers = new ArrayList<>();
+    for (int curr = 0; curr < partitionBatchesCount; curr++) {
+      VectorContainer nextBatch = tmpBatchesList.get(curr);
+      final int currentRecordCount = nextBatch.getRecordCount();
+
+      // For every incoming build batch, we create a matching helper batch
+      hjHelper.addNewBatch(currentRecordCount);
+
+      // Holder contains the global index where the key is hashed into using 
the hash table
+      final IndexPointer htIndex = new IndexPointer();
+
+      assert nextBatch != null;
+      assert probeBatch != null;
+
+      hashTable.updateIncoming(nextBatch, probeBatch );
+
+      // IntVector HV_vector = (IntVector) 
nextBatch.getValueVector(rightHVColPosition).getValueVector();
+      IntVector HV_vector = (IntVector) nextBatch.getLast();
+
+      for (int recInd = 0; recInd < currentRecordCount; recInd++) {
+        int hashCode = HV_vector.getAccessor().get(recInd);
+        try {
+          hashTable.put(recInd, htIndex, hashCode);
+        } catch (RetryAfterSpillException RE) {
+          throw new OutOfMemoryException("HT put");
+        } // Hash Join can not retry yet
+        /* Use the global index returned by the hash table, to store
+         * the current record index and batch index. This will be used
+         * later when we probe and find a match.
+         */
+        hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, 
recInd);
+      }
+
+      containers.add(nextBatch);
+    }
+    outerBatchNotNeeded = true; // the inner is whole in memory, no need for 
an outer batch
+  }
+
+  public void getStats(HashTableStats newStats) {
+    hashTable.getStats(newStats);
+  }
+
+  public void clearHashTableAndHelper() {
+    if (hashTable != null) {
+      hashTable.clear();
+      hashTable = null;
+    }
+    if (hjHelper != null) {
+      hjHelper.clear();
+      hjHelper = null;
+    }
+  }
+
+  public void close() {
+    freeCurrentBatchAndHVVector();
+    if (containers != null && !containers.isEmpty()) {
+      for (VectorContainer vc : containers) {
+        vc.clear();
+      }
+    }
+    while ( tmpBatchesList.size() > 0 ) {
+      VectorContainer vc = tmpBatchesList.remove(0);
+      inMemBatches.dec();
+      vc.clear();
+    }
+    closeWriter();
+    partitionBatchesCount = 0;
+    spillFile = null;
+    clearHashTableAndHelper();
+    if ( containers != null ) { containers.clear(); }
+  }
+
+} // class HashPartition
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index a5eb1f2..ed8f388 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -46,16 +46,18 @@ public interface HashTable {
   int BATCH_SIZE = Character.MAX_VALUE + 1;
   int BATCH_MASK = 0x0000FFFF;
 
-  void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch 
incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
+  void setup(HashTableConfig htConfig, BufferAllocator allocator, 
VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
              VectorContainer htContainerOrig);
 
   void updateBatches() throws SchemaChangeException;
 
-  int getHashCode(int incomingRowIdx) throws SchemaChangeException;
+  int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException;
+
+  int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException;
 
   PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) 
throws SchemaChangeException, RetryAfterSpillException;
 
-  int containsKey(int incomingRowIdx, boolean isProbe) throws 
SchemaChangeException;
+  int probeForKey(int incomingRowIdx, int hashCode) throws 
SchemaChangeException;
 
   void getStats(HashTableStats stats);
 
@@ -65,7 +67,7 @@ public interface HashTable {
 
   void clear();
 
-  void reinit(RecordBatch newIncoming);
+  public void updateIncoming(VectorContainer newIncoming, RecordBatch 
newIncomingProbe);
 
   void reset();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 64f8144..b7a7f7b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -79,7 +79,7 @@ public abstract class HashTableTemplate implements HashTable {
   private BufferAllocator allocator;
 
   // The incoming build side record batch
-  private RecordBatch incomingBuild;
+  private VectorContainer incomingBuild;
 
   // The incoming probe side record batch (may be null)
   private RecordBatch incomingProbe;
@@ -417,7 +417,7 @@ public abstract class HashTableTemplate implements 
HashTable {
 
     @RuntimeOverridden
     protected void setupInterior(
-        @Named("incomingBuild") RecordBatch incomingBuild,
+        @Named("incomingBuild") VectorContainer incomingBuild,
         @Named("incomingProbe") RecordBatch incomingProbe,
         @Named("outgoing") RecordBatch outgoing,
         @Named("htContainer") VectorContainer htContainer) throws 
SchemaChangeException {
@@ -447,7 +447,7 @@ public abstract class HashTableTemplate implements 
HashTable {
 
 
   @Override
-  public void setup(HashTableConfig htConfig, BufferAllocator allocator, 
RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, 
VectorContainer htContainerOrig) {
+  public void setup(HashTableConfig htConfig, BufferAllocator allocator, 
VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, 
VectorContainer htContainerOrig) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
@@ -572,10 +572,31 @@ public abstract class HashTableTemplate implements 
HashTable {
     throw new RetryAfterSpillException();
   }
 
-  public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
+  /**
+   *   Return the Hash Value for the row in the Build incoming batch at index:
+   *   (For Hash Aggregate there's no "Build" side -- only one batch - this 
one)
+   *
+   * @param incomingRowIdx
+   * @return
+   * @throws SchemaChangeException
+   */
+  @Override
+  public int getBuildHashCode(int incomingRowIdx) throws SchemaChangeException 
{
     return getHashBuild(incomingRowIdx, 0);
   }
 
+  /**
+   *   Return the Hash Value for the row in the Probe incoming batch at index:
+   *
+   * @param incomingRowIdx
+   * @return
+   * @throws SchemaChangeException
+   */
+  @Override
+  public int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException 
{
+    return getHashProbe(incomingRowIdx, 0);
+  }
+
   /** put() uses the hash code (from gethashCode() above) to insert the key(s) 
from the incoming
    * row into the hash table. The code selects the bucket in the startIndices, 
then the keys are
    * placed into the chained list - by storing the key values into a batch, 
and updating its
@@ -585,7 +606,7 @@ public abstract class HashTableTemplate implements 
HashTable {
    *
    * @param incomingRowIdx - position of the incoming row
    * @param htIdxHolder - to return batch + batch-offset (for caller to manage 
a matching batch)
-   * @param hashCode - computed over the key(s) by calling getHashCode()
+   * @param hashCode - computed over the key(s) by calling getBuildHashCode()
    * @return Status - the key(s) was ADDED or was already PRESENT
    */
   @Override
@@ -659,17 +680,24 @@ public abstract class HashTableTemplate implements 
HashTable {
         PutStatus.KEY_ADDED;     // otherwise
   }
 
-  // Return -1 if key is not found in the hash table. Otherwise, return the 
global index of the key
-  @Override
-  public int containsKey(int incomingRowIdx, boolean isProbe) throws 
SchemaChangeException {
-    int seedValue = 0;
-    int hash = isProbe ? getHashProbe(incomingRowIdx, seedValue) : 
getHashBuild(incomingRowIdx, seedValue);
-    int bucketIndex = getBucketIndex(hash, numBuckets());
+  /**
+   * Return -1 if Probe-side key is not found in the (build-side) hash table.
+   * Otherwise, return the global index of the key
+   *
+   *
+   * @param incomingRowIdx
+   * @param hashCode - The hash code for the Probe-side key
+   * @return -1 if key is not found, else return the global index of the key
+   * @throws SchemaChangeException
+   */
+   @Override
+  public int probeForKey(int incomingRowIdx, int hashCode) throws 
SchemaChangeException {
+    int bucketIndex = getBucketIndex(hashCode, numBuckets());
 
     for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex);
           currentIdxHolder.value != EMPTY_SLOT; ) {
       BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & 
BATCH_MASK);
-      if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
+      if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, true /* isProbe */)) 
{
         return currentIdxHolder.value;
       }
     }
@@ -776,9 +804,10 @@ public abstract class HashTableTemplate implements 
HashTable {
     batchHolders = new ArrayList<BatchHolder>();
     startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
   }
-  public void reinit(RecordBatch newIncoming) {
+  public void updateIncoming(VectorContainer newIncoming, RecordBatch 
newIncomingProbe) {
     incomingBuild = newIncoming;
-    reset();
+    incomingProbe = newIncomingProbe;
+    // reset();
     try {
       updateBatches();  // Needed to update the value vectors in the generated 
code with the new incoming
     } catch (SchemaChangeException e) {
@@ -806,7 +835,7 @@ public abstract class HashTableTemplate implements 
HashTable {
   public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
 
   // These methods will be code-generated in the context of the outer class
-  protected abstract void doSetup(@Named("incomingBuild") RecordBatch 
incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws 
SchemaChangeException;
+  protected abstract void doSetup(@Named("incomingBuild") VectorContainer 
incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws 
SchemaChangeException;
 
   protected abstract int getHashBuild(@Named("incomingRowIdx") int 
incomingRowIdx, @Named("seedValue") int seedValue) throws SchemaChangeException;
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index b126255..998e0b1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -20,9 +20,12 @@ package org.apache.drill.exec.physical.impl.join;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
@@ -31,43 +34,37 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.IndexPointer;
 import org.apache.drill.exec.physical.impl.common.Comparator;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
-
 public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
-  public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
-  public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 
1000;
+  protected static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
+
+  private int RECORDS_PER_BATCH = 128; // 1024; // internal batches
+  private static final int TARGET_RECORDS_PER_BATCH = 4000;
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
@@ -77,84 +74,84 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
   private final List<Comparator> comparators;
 
-  // Runtime generated class implementing HashJoinProbe interface
-  private HashJoinProbe hashJoinProbe = null;
+  // Fields used for partitioning
+  private int numPartitions = 1; // must be 2 to the power of bitsInMask (set 
in setup())
+  private int partitionMask = 0; // numPartitions - 1
+  private int bitsInMask = 0; // number of bits in the MASK
+  private ChainedHashTable baseHashTable;
+  private boolean buildSideIsEmpty = true;
+  private boolean canSpill = true;
+  private boolean wasKilled; // a kill was received, may need to clean spilled 
partns
 
-  /* Helper class
-   * Maintains linked list of build side records with the same key
-   * Keeps information about which build records have a corresponding
-   * matching key in the probe side (for outer, right joins)
-   */
-  private HashJoinHelper hjHelper = null;
-
-  // Underlying hashtable used by the hash join
-  private HashTable hashTable = null;
-
-  /* Hyper container to store all build side record batches.
-   * Records are retrieved from this container when there is a matching record
-   * on the probe side
-   */
-  private ExpandableHyperContainer hyperContainer;
+  HashPartition partitions[];
 
   // Number of records in the output container
   private int outputRecords;
 
-  // Current batch index on the build side
-  private int buildBatchIndex = 0;
-
   // Schema of the build side
   private BatchSchema rightSchema = null;
 
+  private final HashTableStats htStats = new HashTableStats();
+
+  private final MajorType HVtype = MajorType.newBuilder()
+    .setMinorType(org.apache.drill.common.types.TypeProtos.MinorType.INT /* 
dataType */ )
+    .setMode(DataMode.REQUIRED /* mode */ )
+    .build();
+
+  private int rightHVColPosition;
+  private BufferAllocator allocator;
+  // Local fields for left/right incoming - may be replaced when reading from 
spilled
+  private RecordBatch buildBatch;
+  private RecordBatch probeBatch;
+
+  // For handling spilling
+  private SpillSet spillSet;
+  HashJoinPOP popConfig;
+
+  private int cycleNum = 0; // primary, secondary, tertiary, etc.
+  private int originalPartition = -1; // the partition a secondary reads from
+  IntVector read_HV_vector; // HV vector that was read from the spilled batch
+  private int MAX_BATCHES_IN_MEMORY;
+  private int MAX_BATCHES_PER_PARTITION;
+
+  public class inMemBatchCounter {
+    private int inMemBatches;
+    public void inc() { inMemBatches++; }
+    public void dec() { inMemBatches--; }
+    public int value() { return inMemBatches; }
+  }
+  public inMemBatchCounter inMemBatches = new inMemBatchCounter();
 
-  // Generator mapping for the build side
-  // Generator mapping for the build side : scalar
-  private static final GeneratorMapping PROJECT_BUILD =
-      GeneratorMapping.create("doSetup"/* setup method */, 
"projectBuildRecord" /* eval method */, null /* reset */,
-          null /* cleanup */);
-  // Generator mapping for the build side : constant
-  private static final GeneratorMapping PROJECT_BUILD_CONSTANT = 
GeneratorMapping.create("doSetup"/* setup method */,
-      "doSetup" /* eval method */,
-      null /* reset */, null /* cleanup */);
-
-  // Generator mapping for the probe side : scalar
-  private static final GeneratorMapping PROJECT_PROBE =
-      GeneratorMapping.create("doSetup" /* setup method */, 
"projectProbeRecord" /* eval method */, null /* reset */,
-          null /* cleanup */);
-  // Generator mapping for the probe side : constant
-  private static final GeneratorMapping PROJECT_PROBE_CONSTANT = 
GeneratorMapping.create("doSetup" /* setup method */,
-      "doSetup" /* eval method */,
-      null /* reset */, null /* cleanup */);
-
-
-  // Mapping set for the build side
-  private final MappingSet projectBuildMapping =
-      new MappingSet("buildIndex" /* read index */, "outIndex" /* write index 
*/, "buildBatch" /* read container */,
-          "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, 
PROJECT_BUILD);
-
-  // Mapping set for the probe side
-  private final MappingSet projectProbeMapping = new MappingSet("probeIndex" 
/* read index */, "outIndex" /* write index */,
-      "probeBatch" /* read container */,
-      "outgoing" /* write container */,
-      PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
-
-  // indicates if we have previously returned an output batch
-  boolean firstOutputBatch = true;
+  private static class HJSpilledPartition {
+    public int innerSpilledBatches;
+    public String innerSpillFile;
+    public int outerSpilledBatches;
+    public String outerSpillFile;
+    int cycleNum;
+    int origPartn;
+    int prevOrigPartn; }
 
-  private final HashTableStats htStats = new HashTableStats();
+  private ArrayList<HJSpilledPartition> spilledPartitionsList;
+  private HJSpilledPartition spilledInners[]; // for the outer to find the 
partition
 
+  private int operatorId; // for the spill file name
   public enum Metric implements MetricDef {
 
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
-    RESIZING_TIME_MS;
+    RESIZING_TIME_MS,
+    NUM_PARTITIONS,
+    SPILLED_PARTITIONS, // number of original partitions spilled to disk
+    SPILL_MB,         // Number of MB of data spilled to disk. This amount is 
first written,
+                      // then later re-read. So, disk I/O is twice this amount.
+    SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    ;
 
     // duplicate for hash ag
 
     @Override
-    public int metricId() {
-      return ordinal();
-    }
+    public int metricId() { return ordinal(); }
   }
 
   @Override
@@ -169,31 +166,16 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     }
 
     // Initialize the hash join helper context
-    hjHelper = new HashJoinHelper(context, oContext.getAllocator());
-    try {
-      rightSchema = right.getSchema();
-      final VectorContainer vectors = new VectorContainer(oContext);
-      for (final VectorWrapper<?> w : right) {
-        vectors.addOrGet(w.getField());
-      }
-      vectors.buildSchema(SelectionVectorMode.NONE);
-      vectors.setRecordCount(0);
-      hyperContainer = new ExpandableHyperContainer(vectors);
-      hjHelper.addNewBatch(0);
-      buildBatchIndex++;
-      if (isFurtherProcessingRequired(rightUpstream) && 
this.right.getRecordCount() > 0) {
-        setupHashTable();
-      }
-      hashJoinProbe = setupHashJoinProbe();
-      // Build the container schema and set the counts
-      for (final VectorWrapper<?> w : container) {
-        w.getValueVector().allocateNew();
-      }
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      container.setRecordCount(outputRecords);
-    } catch (IOException | ClassTransformationException e) {
-      throw new SchemaChangeException(e);
+    if (rightUpstream != IterOutcome.NONE) {
+      setupHashTable();
     }
+    setupOutputContainerSchema();
+    // Build the container schema and set the counts
+    for (final VectorWrapper<?> w : container) {
+      w.getValueVector().allocateNew();
+    }
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setRecordCount(outputRecords);
   }
 
   @Override
@@ -205,19 +187,21 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       if (state == BatchState.FIRST) {
         // Build the hash table, using the build side record batches.
         executeBuildPhase();
-        hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 
left.getRecordCount(), this, hashTable,
-            hjHelper, joinType, leftUpstream);
         // Update the hash table related stats for the operator
-        updateStats(this.hashTable);
+        updateStats();
+        //
+        setupProbe();
       }
 
       // Store the number of records projected
-      if ((hashTable != null && !hashTable.isEmpty()) || joinType != 
JoinRelType.INNER) {
+
+      if ( ! buildSideIsEmpty ||  // If there are build-side rows
+           joinType != JoinRelType.INNER) {  // or if this is a left/full 
outer join
 
         // Allocate the memory for the vectors in the output container
         allocateVectors();
 
-        outputRecords = hashJoinProbe.probeAndProject();
+        outputRecords = probeAndProject();
 
         /* We are here because of one the following
          * 1. Completed processing of all the records and we are done
@@ -235,34 +219,94 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
 
           return IterOutcome.OK;
         }
+
+        // Free all partitions' in-memory data structures
+        // (In case need to start processing spilled partitions)
+        for ( HashPartition partn : partitions ) {
+          partn.close();
+        }
+
+        //
+        //  (recursively) Handle the spilled partitions, if any
+        //
+        if ( !buildSideIsEmpty && !wasKilled && 
!spilledPartitionsList.isEmpty()) {
+          // Get the next (previously) spilled partition to handle as incoming
+          HJSpilledPartition currSp = spilledPartitionsList.remove(0);
+
+          // Create a BUILD-side "incoming" out of the inner spill file of 
that partition
+          buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, 
currSp.innerSpilledBatches, context, rightSchema, oContext, spillSet);
+          // The above ctor call also got the first batch; need to update the 
outcome
+          rightUpstream = ((SpilledRecordbatch) 
buildBatch).getInitialOutcome();
+
+          if ( currSp.outerSpilledBatches > 0 ) {
+            // Create a PROBE-side "incoming" out of the outer spill file of 
that partition
+            probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, 
currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+            // The above ctor call also got the first batch; need to update 
the outcome
+            leftUpstream = ((SpilledRecordbatch) 
probeBatch).getInitialOutcome();
+          } else {
+            probeBatch = left; // if no outer batch then reuse left - needed 
for updateIncoming()
+            leftUpstream = IterOutcome.NONE;
+            changeToFinalProbeState();
+          }
+
+          // update the cycle num if needed
+          // The current cycle num should always be one larger than in the 
spilled partition
+          if (cycleNum == currSp.cycleNum) {
+            cycleNum = 1 + currSp.cycleNum;
+            stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
+            // report first spill or memory stressful situations
+            if (cycleNum == 1) { logger.info("Started reading spilled records 
"); }
+            if (cycleNum == 2) { logger.info("SECONDARY SPILLING "); }
+            if (cycleNum == 3) { logger.warn("TERTIARY SPILLING ");  }
+            if (cycleNum == 4) { logger.warn("QUATERNARY SPILLING "); }
+            if (cycleNum == 5) { logger.warn("QUINARY SPILLING "); }
+            if ( cycleNum * bitsInMask > 20 ) {
+              spilledPartitionsList.add(currSp); // so cleanup() would delete 
the curr spill files
+              this.cleanup();
+              throw UserException
+                .unsupportedError()
+                .message("Hash-Join can not partition inner data any further 
(too many join-key duplicates? - try merge-join)")
+                .build(logger);
+            }
+          }
+          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {}-{} batches). More {} spilled partitions left.", 
currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, 
currSp.outerSpilledBatches, currSp.innerSpilledBatches, 
spilledPartitionsList.size());
+
+          state = BatchState.FIRST;  // build again, initialize probe, etc
+
+          return innerNext(); // start processing the next spilled partition 
"recursively"
+        }
+
       } else {
         // Our build side is empty, we won't have any matches, clear the probe 
side
         if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == 
IterOutcome.OK) {
-          for (final VectorWrapper<?> wrapper : left) {
+          for (final VectorWrapper<?> wrapper : probeBatch) {
             wrapper.getValueVector().clear();
           }
-          left.kill(true);
-          leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
+          probeBatch.kill(true);
+          leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
           while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == 
IterOutcome.OK) {
-            for (final VectorWrapper<?> wrapper : left) {
+            for (final VectorWrapper<?> wrapper : probeBatch) {
               wrapper.getValueVector().clear();
             }
-            leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
+            leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
           }
         }
       }
 
       // No more output records, clean up and return
       state = BatchState.DONE;
+
+      this.cleanup();
+
       return IterOutcome.NONE;
-    } catch (ClassTransformationException | SchemaChangeException | 
IOException e) {
+    } catch (SchemaChangeException e) {
       context.getExecutorState().fail(e);
       killIncoming(false);
       return IterOutcome.STOP;
     }
   }
 
-  public void setupHashTable() throws IOException, SchemaChangeException, 
ClassTransformationException {
+  private void setupHashTable() throws SchemaChangeException {
     // Setup the hash table configuration object
     int conditionsSize = conditions.size();
     final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize);
@@ -278,43 +322,103 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != 
IterOutcome.OK) {
       leftExpr = null;
     } else {
-      if (left.getSchema().getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
-        final String errorMsg = new StringBuilder()
-            .append("Hash join does not support probe batch with selection 
vectors. ")
-            .append("Probe batch has selection mode = ")
-            .append(left.getSchema().getSelectionVectorMode())
-            .toString();
+      if (probeBatch.getSchema().getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
+        final String errorMsg = new StringBuilder().append("Hash join does not 
support probe batch with selection vectors. ").append("Probe batch has 
selection mode = ").append
+          (probeBatch.getSchema().getSelectionVectorMode()).toString();
         throw new SchemaChangeException(errorMsg);
       }
     }
+    final HashTableConfig htConfig = new HashTableConfig((int) 
context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), 
HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
 
-    final HashTableConfig htConfig =
-        new HashTableConfig((int) 
context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
-            HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
 
     // Create the chained hash table
-    final ChainedHashTable ht =
-        new ChainedHashTable(htConfig, context, oContext.getAllocator(), 
this.right, this.left, null);
-    hashTable = ht.createAndSetupHashTable(null, 1);
+    baseHashTable =
+      new ChainedHashTable(htConfig, context, allocator, buildBatch, 
probeBatch, null);
   }
 
-  public void executeBuildPhase() throws SchemaChangeException, 
ClassTransformationException, IOException {
-    //Setup the underlying hash table
+  /**
+   *  Call only after num partitions is known
+   */
+  private void delayedSetup() {
+    //
+    //  Find out the estimated max batch size, etc
+    //  and compute the max numPartitions possible
+    //
+    // numPartitions = 8; // just for initial work; change later
+    // partitionMask = 7;
+    // bitsInMask = 3;
+
+    //  SET FROM CONFIGURATION OPTIONS :
+    //  ================================
+
+    // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
+    numPartitions = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
+    if ( numPartitions == 1 ) { //
+      canSpill = false;
+      logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
+    }
+    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case 
not a power of 2
+    // Based on the number of partitions: Set the mask and bit count
+    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    RECORDS_PER_BATCH = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
+
+    MAX_BATCHES_IN_MEMORY = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+    MAX_BATCHES_PER_PARTITION = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR);
+
+    //  =================================
+
+    // Create the FIFO list of spilled partitions (pairs - inner/outer)
+    spilledPartitionsList = new ArrayList<>();
+
+    // Create array for the partitions
+    partitions = new HashPartition[numPartitions];
+
+    buildSideIsEmpty = false;
+  }
+
+  /**
+   * Initialize fields (that may be reused when reading spilled partitions)
+   */
+  private void initializeBuild() {
+    assert inMemBatches.value() == 0; // check that no in-memory batches left
+    baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we 
process the spilled files
+    // Recreate the partitions every time build is initialized
+    for (int part = 0; part < numPartitions; part++ ) {
+      partitions[part] = new HashPartition(context, allocator, baseHashTable, 
buildBatch, probeBatch,
+        RECORDS_PER_BATCH, spillSet, part, inMemBatches, cycleNum);
+    }
+
+    spilledInners = new HJSpilledPartition[numPartitions];
+
+  }
+  /**
+   *  Execute the BUILD phase; first read incoming and split rows into 
partitions;
+   *  may decide to spill some of the partitions
+   *
+   * @throws SchemaChangeException
+   */
+  public void executeBuildPhase() throws SchemaChangeException {
+    final HashJoinMemoryCalculator.BuildSidePartitioning buildCalc = new 
HashJoinMemoryCalculatorImpl().next();
+    boolean hasProbeData = leftUpstream != IterOutcome.NONE;
+
+    if ( rightUpstream == IterOutcome.NONE ) { return; } // empty right
 
     // skip first batch if count is zero, as it may be an empty schema batch
-    if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() 
== 0) {
-      for (final VectorWrapper<?> w : right) {
+    if (false && buildBatch.getRecordCount() == 0) {
+      for (final VectorWrapper<?> w : buildBatch) {
         w.clear();
       }
-      rightUpstream = next(right);
-      if (isFurtherProcessingRequired(rightUpstream) &&
-          right.getRecordCount() > 0 && hashTable == null) {
-        setupHashTable();
-      }
+      rightUpstream = next(buildBatch);
     }
 
-    boolean moreData = true;
+    //Setup the underlying hash table
+    if ( cycleNum == 0 ) { delayedSetup(); } // first time only
+
+    initializeBuild();
 
+    boolean moreData = true;
     while (moreData) {
       switch (rightUpstream) {
       case OUT_OF_MEMORY:
@@ -325,90 +429,123 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         continue;
 
       case OK_NEW_SCHEMA:
-        if (rightSchema == null) {
-          rightSchema = right.getSchema();
-
-          if (rightSchema.getSelectionVectorMode() != 
BatchSchema.SelectionVectorMode.NONE) {
-            final String errorMsg = new StringBuilder()
-                .append("Hash join does not support build batch with selection 
vectors. ")
-                .append("Build batch has selection mode = ")
-                .append(left.getSchema().getSelectionVectorMode())
-                .toString();
-
-            throw new SchemaChangeException(errorMsg);
-          }
-          setupHashTable();
-        } else {
-          if (!rightSchema.equals(right.getSchema())) {
-            throw SchemaChangeException.schemaChanged("Hash join does not 
support schema changes in build side.", rightSchema, right.getSchema());
-          }
-          hashTable.updateBatches();
+        if (!rightSchema.equals(buildBatch.getSchema())) {
+          throw SchemaChangeException.schemaChanged("Hash join does not 
support schema changes in build side.", rightSchema, buildBatch.getSchema());
         }
+        for (HashPartition partn : partitions) { partn.updateBatches(); }
         // Fall through
       case OK:
-        final int currentRecordCount = right.getRecordCount();
+        final int currentRecordCount = buildBatch.getRecordCount();
+
+        if ( cycleNum > 0 ) {
+          read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
+        }
+        // For every record in the build batch, hash the key columns and keep 
the result
+        for (int ind = 0; ind < currentRecordCount; ind++) {
+          int hashCode = ( cycleNum == 0 ) ? 
partitions[0].getBuildHashCode(ind)
+            : read_HV_vector.getAccessor().get(ind); // get the hash value 
from the HV column
+          int currPart = hashCode & partitionMask ;
+          hashCode >>>= bitsInMask;
+/*
+          int pos = 
currentBatches[currPart].appendRow(buildBatch.getContainer(),ind);
+          currHVVectors[currPart].getMutator().set(pos, hashCode);   // store 
the hash value in the new column
+          if ( pos + 1 == RECORDS_PER_BATCH ) {
+            // The current decision on when-to-spill is crude
+            completeAnInnerBatch(currPart,true,
+              isSpilled(currPart) ||  // once spilled - then spill every new 
full batch
+                canSpill &&
+                  ( inMemBatches > MAX_BATCHES_IN_MEMORY ||
+                    tmpBatchesList[currPart].size() > 
MAX_BATCHES_PER_PARTITION ));
+          }
+*/
+
+          // Append the new inner row to the appropriate partition; spill 
(that partition) if needed
+          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, 
hashCode,
+            // The current decision on when-to-spill is crude ...
+            partitions[currPart].isSpilled() || // once spilled - then spill 
every new full batch
+            canSpill &&
+              ( inMemBatches.value() > MAX_BATCHES_IN_MEMORY ||
+                partitions[currPart].getPartitionBatchesCount() > 
MAX_BATCHES_PER_PARTITION ) ); // may spill if needed
+        }
+
+        if ( read_HV_vector != null ) {
+          read_HV_vector.clear();
+          read_HV_vector = null;
+        }
+        break;
+      }
+      // Get the next incoming record batch
+      rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
+    }
+
+    // Move the remaining current batches into their temp lists, or spill
+    // them if the partition is spilled. Add the spilled partitions into
+    // the spilled partitions list
+    for (HashPartition partn : partitions) {
+      partn.completeAnInnerBatch(false, partn.isSpilled() );
+      if ( partn.isSpilled() ) {
+        HJSpilledPartition sp = new HJSpilledPartition();
+        sp.innerSpillFile = partn.getSpillFile();
+        sp.innerSpilledBatches = partn.getPartitionBatchesCount();
+        sp.cycleNum = cycleNum; // remember the current cycle
+        sp.origPartn = partn.getPartitionNum(); // for debugging / filename
+        sp.prevOrigPartn = originalPartition; // for debugging / filename
+        spilledPartitionsList.add(sp);
+
+        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find 
the SP later
+        partn.closeWriter();
+      }
+    }
+
+    //
+    //  Traverse all the in-memory partitions' incoming batches, and build 
their hash tables
+    //
+/*
+    for (int currPart = 0; currPart < numPartitions; currPart++) {
+
+      // each partition is a regular array of batches
+      ArrayList<VectorContainer> thisPart = new ArrayList<>();
 
-                    /* For every new build batch, we store some state in the 
helper context
-                     * Add new state to the helper context
-                     */
-        hjHelper.addNewBatch(currentRecordCount);
+      for (int curr = 0; curr < partitionBatchesCount[currPart]; curr++) {
+        VectorContainer nextBatch = tmpBatchesList[currPart].get(curr);
+        final int currentRecordCount = nextBatch.getRecordCount();
+
+        // For every incoming build batch, we create a matching helper batch
+        hjHelpers[currPart].addNewBatch(currentRecordCount);
 
         // Holder contains the global index where the key is hashed into using 
the hash table
         final IndexPointer htIndex = new IndexPointer();
 
-        // For every record in the build batch , hash the key columns
-        for (int i = 0; i < currentRecordCount; i++) {
-          int hashCode = hashTable.getHashCode(i);
-          try {
-            hashTable.put(i, htIndex, hashCode);
-          } catch (RetryAfterSpillException RE) { throw new 
OutOfMemoryException("HT put");} // Hash Join can not retry yet
-                        /* Use the global index returned by the hash table, to 
store
-                         * the current record index and batch index. This will 
be used
-                         * later when we probe and find a match.
-                         */
-          hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
-        }
+        hashTables[currPart].updateIncoming(nextBatch, probeBatch );
 
-                    /* Completed hashing all records in this batch. Transfer 
the batch
-                     * to the hyper vector container. Will be used when we 
want to retrieve
-                     * records that have matching keys on the probe side.
-                     */
-        final RecordBatchData nextBatch = new RecordBatchData(right, 
oContext.getAllocator());
-        boolean success = false;
-        try {
-          if (hyperContainer == null) {
-            hyperContainer = new 
ExpandableHyperContainer(nextBatch.getContainer());
-          } else {
-            hyperContainer.addBatch(nextBatch.getContainer());
-          }
+        IntVector HV_vector = (IntVector) 
nextBatch.getValueVector(rightHVColPosition).getValueVector();
 
-          // completed processing a batch, increment batch index
-          buildBatchIndex++;
-          success = true;
-        } finally {
-          if (!success) {
-            nextBatch.clear();
-          }
+        for (int recInd = 0; recInd < currentRecordCount; recInd++) {
+          int hashCode = HV_vector.getAccessor().get(recInd);
+          try {
+            hashTables[currPart].put(recInd, htIndex, hashCode);
+          } catch (RetryAfterSpillException RE) {
+            throw new OutOfMemoryException("HT put");
+          } // Hash Join can not retry yet
+                        // Use the global index returned by the hash table, to 
store
+                        //the current record index and batch index. This will 
be used
+                        // later when we probe and find a match.
+                        //
+          hjHelpers[currPart].setCurrentIndex(htIndex.value, curr , recInd);
         }
-        break;
+
+        thisPart.add(nextBatch);
       }
-      // Get the next record batch
-      rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
-    }
-  }
 
-  public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, IOException {
-    final CodeGenerator<HashJoinProbe> cg = 
CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
-    cg.plainJavaCapable(true);
-    final ClassGenerator<HashJoinProbe> g = cg.getRoot();
+      partitionContainers.add(thisPart);
+*/
+    for (HashPartition partn : partitions) {
+      partn.buildContainersHashTableAndHelper();
+    }
 
-    // Generate the code to project build side records
-    g.setMappingSet(projectBuildMapping);
+  }
 
-    int fieldId = 0;
-    final JExpression buildIndex = JExpr.direct("buildIndex");
-    final JExpression outIndex = JExpr.direct("outIndex");
-    g.rotateBlock();
+  private void setupOutputContainerSchema() {
 
     if (rightSchema != null) {
       for (final MaterializedField field : rightSchema) {
@@ -427,27 +564,11 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
         final MaterializedField projected = field.withType(outputType);
         // Add the vector to our output container
         container.addOrGet(projected);
-
-        final JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new 
TypedFieldId(field.getType(), true, fieldId));
-        final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(outputType, false, fieldId));
-        g.getEvalBlock().add(outVV.invoke("copyFromSafe")
-            .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-            .arg(outIndex)
-            .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
-        g.rotateBlock();
-        fieldId++;
       }
     }
 
-    // Generate the code to project probe side records
-    g.setMappingSet(projectProbeMapping);
-
-    int outputFieldId = fieldId;
-    fieldId = 0;
-    final JExpression probeIndex = JExpr.direct("probeIndex");
-
     if (leftUpstream == IterOutcome.OK || leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
-      for (final VectorWrapper<?> vv : left) {
+      for (final VectorWrapper<?> vv : probeBatch) {
         final MajorType inputType = vv.getField().getType();
         final MajorType outputType;
 
@@ -465,25 +586,22 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
           vv.getValueVector().makeTransferPair(v);
           v.clear();
         }
-
-        final JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new 
TypedFieldId(inputType, false, fieldId));
-        final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(outputType, false, outputFieldId));
-
-        
g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
-        g.rotateBlock();
-        fieldId++;
-        outputFieldId++;
       }
     }
 
-    final HashJoinProbe hj = context.getImplementationClass(cg);
-    return hj;
   }
 
   private void allocateVectors() {
     for (final VectorWrapper<?> v : container) {
       v.getValueVector().allocateNew();
     }
+    container.setRecordCount(0); // reset container's counter back to zero 
records
+  }
+
+  // (After the inner side was read whole) - Has that inner partition spilled
+  public boolean isSpilledInner(int part) {
+    if ( spilledInners == null ) { return false; } // empty inner
+    return spilledInners[part] != null;
   }
 
   public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
@@ -491,56 +609,379 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
       RecordBatch right /*Build side record batch*/
   ) throws OutOfMemoryException {
     super(popConfig, context, true, left, right);
+    this.buildBatch = right;
+    this.probeBatch = left;
     joinType = popConfig.getJoinType();
     conditions = popConfig.getConditions();
+    this.popConfig = popConfig;
 
     comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+    // When DRILL supports Java 8, use the following instead of the for() loop
+    // 
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
     for (int i=0; i<conditions.size(); i++) {
       JoinCondition cond = conditions.get(i);
       comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
     }
+    this.allocator = oContext.getAllocator();
+
+    final long memLimit = 
context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
+
+    if (memLimit != 0) {
+      allocator.setLimit(memLimit);
+    }
+
+    RECORDS_PER_BATCH = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
+    maxBatchesInMemory = 
(int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+
+    spillSet = new SpillSet(context, popConfig);
+
+    // Create empty partitions (in the ctor - covers the case where right side 
is empty)
+    partitions = new HashPartition[0];
   }
 
-  private void updateStats(HashTable htable) {
-    if (htable == null) {
-      return;
+  public void cleanup() {
+    if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean
+    if ( spillSet.getWriteBytes() > 0 ) {
+      stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+        (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+    }
+    // clean (and deallocate) each partition
+    for (HashPartition partn : partitions) {
+      partn.clearHashTableAndHelper();
+      partn.closeWriterAndDeleteFile();
+    }
+
+    // delete any spill file left in unread spilled partitions
+    while ( ! spilledPartitionsList.isEmpty() ) {
+      HJSpilledPartition sp = spilledPartitionsList.remove(0);
+      try {
+        spillSet.delete(sp.innerSpillFile);
+      } catch(IOException e) {
+        logger.warn("Cleanup: Failed to delete spill file 
{}",sp.innerSpillFile);
+      }
+      try { // outer file is added later; may be null if cleaning prematurely
+        if ( sp.outerSpillFile != null ) { spillSet.delete(sp.outerSpillFile); 
}
+      } catch(IOException e) {
+        logger.warn("Cleanup: Failed to delete spill file 
{}",sp.outerSpillFile);
+      }
+    }
+    // Delete the currently handled (if any) spilled files
+    spillSet.close(); // delete the spill directory(ies)
+  }
+
+  private void updateStats() {
+    if ( buildSideIsEmpty ) { return; } // no stats when the right side is 
empty
+    if ( cycleNum > 0 ) { return; } // These stats are only for before 
processing spilled files
+    long numSpilled = 0;
+    HashTableStats newStats = new HashTableStats();
+    // sum the stats from all the partitions
+    for ( HashPartition partn : partitions ) {
+      if ( partn.isSpilled() ) { numSpilled++; }
+      partn.getStats(newStats);
+      htStats.addStats(newStats);
     }
-    htable.getStats(htStats);
-    stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
-    stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
-    stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
+
+    this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+    this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+    this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+    this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
+    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no 
spill
+    this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
   @Override
   public void killIncoming(boolean sendUpstream) {
-    left.kill(sendUpstream);
-    right.kill(sendUpstream);
+    wasKilled = true;
+    probeBatch.kill(sendUpstream);
+    buildBatch.kill(sendUpstream);
   }
 
   @Override
   public void close() {
-    if (hjHelper != null) {
-      hjHelper.clear();
+    for ( HashPartition partn : partitions ) {
+      partn.close();
     }
+    cleanup();
+    super.close();
+  }
+
+  // ==============================================================
+  //
+  //    Methods used for the probe
+  //
+  // ============================================================
+  private BatchSchema probeSchema;
+
+  enum ProbeState {
+    PROBE_PROJECT, PROJECT_RIGHT, DONE
+  }
+
+  private int currRightPartition = 0; // for returning RIGHT/FULL
+
+  // Number of records to process on the probe side
+  private int recordsToProcess = 0;
+
+  // Number of records processed on the probe side
+  private int recordsProcessed = 0;
+
+  // Indicate if we should drain the next record from the probe side
+  private boolean getNextRecord = true;
+
+  // Contains both batch idx and record idx of the matching record in the 
build side
+  private int currentCompositeIdx = -1;
+
+  // Current state the hash join algorithm is in
+  private ProbeState probeState = ProbeState.PROBE_PROJECT;
 
-    // If we didn't receive any data, hyperContainer may be null, check before 
clearing
-    if (hyperContainer != null) {
-      hyperContainer.clear();
+  // For outer or right joins, this is a list of unmatched records that needs 
to be projected
+  private List<Integer> unmatchedBuildIndexes = null;
+
+  // While probing duplicates, retain current build-side partition in case 
need to continue
+  // probing later on the same chain of duplicates
+  private HashPartition currPartition;
+
+  /**
+   * Various initialization needed to perform the probe
+   * Must be called AFTER the build completes
+   */
+  private void setupProbe() {
+    currRightPartition = 0; // In case it's a Right/Full outer join
+    recordsProcessed = 0;
+    recordsToProcess = 0;
+
+    probeSchema = probeBatch.getSchema();
+    probeState = ProbeState.PROBE_PROJECT;
+
+    // A special case - if the left was an empty file
+    if ( leftUpstream == IterOutcome.NONE ){
+      changeToFinalProbeState();
+    } else {
+      this.recordsToProcess = probeBatch.getRecordCount();
     }
 
-    if (hashTable != null) {
-      hashTable.clear();
+    // for those outer partitions that need spilling (cause their matching 
inners spilled)
+    // initialize those partitions' current batches and hash-value vectors
+    for ( HashPartition partn : partitions ) {
+      partn.allocateNewCurrentBatchAndHV();
+    }
+
+    if ( cycleNum > 0 ) {
+      if ( read_HV_vector != null ) { read_HV_vector.clear();}
+      if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was 
empty
+        read_HV_vector = (IntVector) probeBatch.getContainer().getLast();
+      }
+    }
+  }
+
+  private void executeProjectRightPhase(int currBuildPart) {
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
+      outputRecords =
+        container.appendRow(partitions[currBuildPart].getContainers(), 
unmatchedBuildIndexes.get(recordsProcessed),
+          null /* no probeBatch */, 0 /* no probe index */ );
+      recordsProcessed++;
+    }
+  }
+
+  private void executeProbePhase() throws SchemaChangeException {
+
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
+
+      // Check if we have processed all records in this batch we need to 
invoke next
+      if (recordsProcessed == recordsToProcess) {
+
+        // Done processing all records in the previous batch, clean up!
+        for (VectorWrapper<?> wrapper : probeBatch) {
+          wrapper.getValueVector().clear();
+        }
+
+        IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
+
+        switch (leftUpstream) {
+          case NONE:
+          case NOT_YET:
+          case STOP:
+            recordsProcessed = 0;
+            recordsToProcess = 0;
+            changeToFinalProbeState();
+            // in case some outer partitions were spilled, need to spill their 
last batches
+            for ( HashPartition partn : partitions ) {
+              if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
+              partn.completeAnOuterBatch(false);
+              // update the partition's spill record with the outer side
+              HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
+              sp.outerSpillFile = partn.getSpillFile();
+              sp.outerSpilledBatches = partn.getPartitionBatchesCount();
+
+              partn.closeWriter();
+            }
+
+            continue;
+
+          case OK_NEW_SCHEMA:
+            if (probeBatch.getSchema().equals(probeSchema)) {
+              for ( HashPartition partn : partitions ) { 
partn.updateBatches(); }
+
+            } else {
+              throw SchemaChangeException.schemaChanged("Hash join does not 
support schema changes in probe side.",
+                probeSchema,
+                probeBatch.getSchema());
+            }
+          case OK:
+            recordsToProcess = probeBatch.getRecordCount();
+            recordsProcessed = 0;
+            // If we received an empty batch do nothing
+            if (recordsToProcess == 0) {
+              continue;
+            }
+            if ( cycleNum > 0 ) {
+              read_HV_vector = (IntVector) 
probeBatch.getContainer().getLast(); // Needed ?
+            }
+        }
+      }
+      int probeIndex = -1;
+      // Check if we need to drain the next row in the probe side
+      if (getNextRecord) {
+
+        if ( !buildSideIsEmpty ) {
+          int hashCode = ( cycleNum == 0 ) ?
+            partitions[0].getProbeHashCode(recordsProcessed)
+            : read_HV_vector.getAccessor().get(recordsProcessed);
+          int currBuildPart = hashCode & partitionMask ;
+          hashCode >>>= bitsInMask;
+
+          // Set and keep the current partition (may be used again on 
subsequent probe calls as
+          // inner rows of duplicate key are processed)
+          currPartition = partitions[currBuildPart]; // inner if not spilled, 
else outer
+
+          // If the matching inner partition was spilled
+          if ( isSpilledInner(currBuildPart) ) {
+            // add this row to its outer partition (may cause a spill, when 
the batch is full)
+
+            currPartition.appendOuterRow(hashCode, recordsProcessed);
+
+            recordsProcessed++; // done with this outer record
+            continue; // on to the next outer record
+          }
+
+          probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
+
+        }
+
+        if (probeIndex != -1) {
+
+          /* The current probe record has a key that matches. Get the index
+           * of the first row in the build side that matches the current key
+           * (and record this match in the bitmap, in case of a FULL/RIGHT 
join)
+           */
+          currentCompositeIdx = currPartition.getStartIndex(probeIndex);
+
+          outputRecords =
+            container.appendRow(currPartition.getContainers(), 
currentCompositeIdx,
+              probeBatch.getContainer(), recordsProcessed);
+
+          /* Projected single row from the build side with matching key but 
there
+           * may be more rows with the same key. Check if that's the case
+           */
+          currentCompositeIdx = 
currPartition.getNextIndex(currentCompositeIdx);
+          if (currentCompositeIdx == -1) {
+            /* We only had one row in the build side that matched the current 
key
+             * from the probe side. Drain the next row in the probe side.
+             */
+            recordsProcessed++;
+          } else {
+            /* There is more than one row with the same key on the build side
+             * don't drain more records from the probe side till we have 
projected
+             * all the rows with this key
+             */
+            getNextRecord = false;
+          }
+        } else { // No matching key
+
+          // If we have a left outer join, project the outer side
+          if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+
+            outputRecords =
+              container.appendOuterRow(probeBatch.getContainer(), 
recordsProcessed, rightHVColPosition);
+          }
+          recordsProcessed++;
+        }
+      }
+      else { // match the next inner row with the same key
+
+        currPartition.setRecordMatched(currentCompositeIdx);
+
+        outputRecords =
+          container.appendRow(currPartition.getContainers(), 
currentCompositeIdx,
+            probeBatch.getContainer(), recordsProcessed);
+
+        currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+
+        if (currentCompositeIdx == -1) {
+          // We don't have any more rows matching the current key on the build 
side, move on to the next probe row
+          getNextRecord = true;
+          recordsProcessed++;
+        }
+      }
     }
-    super.close();
   }
 
   /**
-   * This method checks to see if join processing should be continued further.
-   * @param upStream up stream operator status.
-   * @@return true if up stream status is OK or OK_NEW_SCHEMA otherwise false.
+   *  Perform the probe and project the results
+   *
+   * @return number of output records
+   * @throws SchemaChangeException
    */
-  private boolean isFurtherProcessingRequired(IterOutcome upStream) {
-    return upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA;
+  private int probeAndProject() throws SchemaChangeException {
+
+    outputRecords = 0;
+
+    // When handling spilled partitions, the state becomes DONE at the end of 
each partition
+    if ( probeState == ProbeState.DONE ) {
+      return outputRecords; // that is zero
+    }
+
+    if (probeState == ProbeState.PROBE_PROJECT) {
+      executeProbePhase();
+    }
+
+    if (probeState == ProbeState.PROJECT_RIGHT) {
+      // Inner probe is done; now we are here because we still have a RIGHT 
OUTER (or a FULL) join
+
+      do {
+
+        if (unmatchedBuildIndexes == null) { // first time for this partition ?
+          if ( buildSideIsEmpty ) { return outputRecords; } // in case of an 
empty right
+          // Get this partition's list of build indexes that didn't match any 
record on the probe side
+          unmatchedBuildIndexes = 
partitions[currRightPartition].getNextUnmatchedIndex();
+          recordsProcessed = 0;
+          recordsToProcess = unmatchedBuildIndexes.size();
+        }
+
+        // Project the list of unmatched records on the build side
+        executeProjectRightPhase(currRightPartition);
+
+        if ( recordsProcessed < recordsToProcess ) { // more records in this 
partition?
+          return outputRecords;  // outgoing is full; report and come back 
later
+        } else {
+          currRightPartition++; // on to the next right partition
+          unmatchedBuildIndexes = null;
+        }
+
+      }   while ( currRightPartition < numPartitions );
+
+      probeState = ProbeState.DONE; // last right partition was handled; we 
are done now
+    }
+
+    return outputRecords;
   }
-}
+
+  private void changeToFinalProbeState() {
+    // We are done with the (left) probe phase.
+    // If it's a RIGHT or a FULL join then need to get the unmatched indexes 
from the build side
+    probeState =
+      (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? 
ProbeState.PROJECT_RIGHT :
+      ProbeState.DONE; // else we're done
+  }
+
+}  // public class HashJoinBatch
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index e8d747e..55146f4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -103,6 +103,9 @@ public class HashJoinHelper {
     public BitSet getKeyMatchBitVector() {
       return keyMatchBitVector;
     }
+    public void clear() {
+      keyMatchBitVector.clear();
+    }
   }
 
   public SelectionVector4 getNewSV4(int recordCount) throws 
SchemaChangeException {
@@ -231,5 +234,6 @@ public class HashJoinHelper {
     for (BuildInfo info : buildInfoList) {
       info.getLinks().clear();
     }
+    buildInfoList.clear();
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
deleted file mode 100644
index 36d9baa..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.common.HashTable;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.calcite.rel.core.JoinRelType;
-
-public interface HashJoinProbe {
-  TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, 
HashJoinProbeTemplate.class);
-
-  /* The probe side of the hash join can be in the following two states
-   * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we 
have a
-   *    key match and if we do we project the record
-   * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting 
the records
-   *    from the build side that did not match any records on the probe side. 
For Left outer
-   *    case we handle it internally by projecting the record if there isn't a 
match on the build side
-   * 3. DONE: Once we have projected all possible records we are done
-   */
-  enum ProbeState {
-    PROBE_PROJECT, PROJECT_RIGHT, DONE
-  }
-
-  void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, 
RecordBatch probeBatch,
-                          int probeRecordCount, HashJoinBatch outgoing, 
HashTable hashTable, HashJoinHelper hjHelper,
-                          JoinRelType joinRelType, RecordBatch.IterOutcome 
leftStartState);
-  void doSetup(FragmentContext context, VectorContainer buildBatch, 
RecordBatch probeBatch, RecordBatch outgoing);
-  int  probeAndProject() throws SchemaChangeException, 
ClassTransformationException, IOException;
-  void projectBuildRecord(int buildIndex, int outIndex);
-  void projectProbeRecord(int probeIndex, int outIndex);
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
deleted file mode 100644
index 1a85277..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import java.io.IOException;
-import java.util.List;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.common.HashTable;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.calcite.rel.core.JoinRelType;
-
-public abstract class HashJoinProbeTemplate implements HashJoinProbe {
-
-  // Probe side record batch
-  private RecordBatch probeBatch;
-
-  private BatchSchema probeSchema;
-
-  private VectorContainer buildBatch;
-
-  // Join type, INNER, LEFT, RIGHT or OUTER
-  private JoinRelType joinType;
-
-  private HashJoinBatch outgoingJoinBatch = null;
-
-  private static final int TARGET_RECORDS_PER_BATCH = 4000;
-
-  /* Helper class
-   * Maintains linked list of build side records with the same key
-   * Keeps information about which build records have a corresponding
-   * matching key in the probe side (for outer, right joins)
-   */
-  private HashJoinHelper hjHelper = null;
-
-  // Underlying hashtable used by the hash join
-  private HashTable hashTable = null;
-
-  // Number of records to process on the probe side
-  private int recordsToProcess = 0;
-
-  // Number of records processed on the probe side
-  private int recordsProcessed = 0;
-
-  // Number of records in the output container
-  private int outputRecords;
-
-  // Indicate if we should drain the next record from the probe side
-  private boolean getNextRecord = true;
-
-  // Contains both batch idx and record idx of the matching record in the 
build side
-  private int currentCompositeIdx = -1;
-
-  // Current state the hash join algorithm is in
-  private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
-  // For outer or right joins, this is a list of unmatched records that needs 
to be projected
-  private List<Integer> unmatchedBuildIndexes = null;
-
-  @Override
-  public void setupHashJoinProbe(FragmentContext context, VectorContainer 
buildBatch, RecordBatch probeBatch,
-                                 int probeRecordCount, HashJoinBatch outgoing, 
HashTable hashTable,
-                                 HashJoinHelper hjHelper, JoinRelType 
joinRelType, IterOutcome leftStartState) {
-
-    this.probeBatch = probeBatch;
-    this.probeSchema = probeBatch.getSchema();
-    this.buildBatch = buildBatch;
-    this.joinType = joinRelType;
-    this.recordsToProcess = probeRecordCount;
-    this.hashTable = hashTable;
-    this.hjHelper = hjHelper;
-    this.outgoingJoinBatch = outgoing;
-
-    if (leftStartState == IterOutcome.NONE) {
-      if (joinRelType == JoinRelType.RIGHT) {
-        probeState = ProbeState.PROJECT_RIGHT;
-      } else {
-        probeState = ProbeState.DONE;
-      }
-    }
-
-    doSetup(context, buildBatch, probeBatch, outgoing);
-  }
-
-  public void executeProjectRightPhase() {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
-      projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), 
outputRecords);
-      recordsProcessed++;
-      outputRecords++;
-    }
-  }
-
-  public void executeProbePhase() throws SchemaChangeException {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != 
ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
-      // Check if we have processed all records in this batch we need to 
invoke next
-      if (recordsProcessed == recordsToProcess) {
-
-        // Done processing all records in the previous batch, clean up!
-        for (VectorWrapper<?> wrapper : probeBatch) {
-          wrapper.getValueVector().clear();
-        }
-
-        IterOutcome leftUpstream = 
outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
-        switch (leftUpstream) {
-          case NONE:
-          case NOT_YET:
-          case STOP:
-            recordsProcessed = 0;
-            recordsToProcess = 0;
-            probeState = ProbeState.DONE;
-
-            // We are done with the probe phase. If its a RIGHT or a FULL join 
get the unmatched indexes from the build side
-            if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) 
{
-                probeState = ProbeState.PROJECT_RIGHT;
-            }
-
-            continue;
-
-          case OK_NEW_SCHEMA:
-            if (probeBatch.getSchema().equals(probeSchema)) {
-              doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, 
outgoingJoinBatch);
-              if (hashTable != null) {
-                hashTable.updateBatches();
-              }
-            } else {
-              throw SchemaChangeException.schemaChanged("Hash join does not 
support schema changes in probe side.",
-                  probeSchema,
-                  probeBatch.getSchema());
-            }
-          case OK:
-            recordsToProcess = probeBatch.getRecordCount();
-            recordsProcessed = 0;
-            // If we received an empty batch do nothing
-            if (recordsToProcess == 0) {
-              continue;
-            }
-        }
-      }
-      int probeIndex = -1;
-
-      // Check if we need to drain the next row in the probe side
-      if (getNextRecord) {
-        if (hashTable != null && !hashTable.isEmpty()) {
-          probeIndex = hashTable.containsKey(recordsProcessed, true);
-        }
-
-          if (probeIndex != -1) {
-
-            /* The current probe record has a key that matches. Get the index
-             * of the first row in the build side that matches the current key
-             */
-            currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
-            /* Record in the build side at currentCompositeIdx has a matching 
record in the probe
-             * side. Set the bit corresponding to this index so if we are 
doing a FULL or RIGHT
-             * join we keep track of which records we need to project at the 
end
-             */
-            hjHelper.setRecordMatched(currentCompositeIdx);
-
-            projectBuildRecord(currentCompositeIdx, outputRecords);
-            projectProbeRecord(recordsProcessed, outputRecords);
-            outputRecords++;
-            /* Projected single row from the build side with matching key but 
there
-             * may be more rows with the same key. Check if that's the case
-             */
-            currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-            if (currentCompositeIdx == -1) {
-              /* We only had one row in the build side that matched the 
current key
-               * from the probe side. Drain the next row in the probe side.
-               */
-              recordsProcessed++;
-            } else {
-              /* There is more than one row with the same key on the build side
-               * don't drain more records from the probe side till we have 
projected
-               * all the rows with this key
-               */
-              getNextRecord = false;
-            }
-        } else { // No matching key
-
-            // If we have a left outer join, project the keys
-            if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-              projectProbeRecord(recordsProcessed, outputRecords);
-              outputRecords++;
-            }
-            recordsProcessed++;
-          }
-      } else {
-        hjHelper.setRecordMatched(currentCompositeIdx);
-        projectBuildRecord(currentCompositeIdx, outputRecords);
-        projectProbeRecord(recordsProcessed, outputRecords);
-        outputRecords++;
-
-        currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
-        if (currentCompositeIdx == -1) {
-          // We don't have any more rows matching the current key on the build 
side, move on to the next probe row
-          getNextRecord = true;
-          recordsProcessed++;
-        }
-      }
-    }
-  }
-
-  public int probeAndProject() throws SchemaChangeException, 
ClassTransformationException, IOException {
-
-    outputRecords = 0;
-
-    if (probeState == ProbeState.PROBE_PROJECT) {
-      executeProbePhase();
-    }
-
-    if (probeState == ProbeState.PROJECT_RIGHT) {
-
-      // We are here because we have a RIGHT OUTER or a FULL join
-      if (unmatchedBuildIndexes == null) {
-        // Initialize list of build indexes that didn't match a record on the 
probe side
-        unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
-        recordsToProcess = unmatchedBuildIndexes.size();
-        recordsProcessed = 0;
-      }
-
-      // Project the list of unmatched records on the build side
-      executeProjectRightPhase();
-    }
-
-    return outputRecords;
-  }
-
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") 
RecordBatch probeBatch,
-                               @Named("outgoing") RecordBatch outgoing);
-  public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, 
@Named("outIndex") int outIndex);
-
-  public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, 
@Named("outIndex") int outIndex);
-
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 2f9ab14..d0421f6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.cache.VectorSerializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
@@ -432,6 +433,10 @@ public class SpillSet {
         operName = "HashAgg";
         spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM);
         dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS);
+    } else if (popConfig instanceof HashJoinPOP) {
+      operName = "HashJoin";
+      spillFs = config.getString(ExecConstants.HASHJOIN_SPILL_FILESYSTEM);
+      dirList = config.getStringList(ExecConstants.HASHJOIN_SPILL_DIRS);
     } else {
         // just use the common ones
         operName = "Unknown";
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 9da8a4b..433e0c8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -220,6 +220,11 @@ public class UnorderedReceiverBatch implements 
CloseableRecordBatch {
     throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
   }
 
+  @Override
+  public VectorContainer getContainer() {
+    return batchLoader.getContainer();
+  }
+
   private void informSenders() {
     logger.info("Informing senders of request to terminate sending.");
     final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 05eb545..378c980 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -359,6 +359,11 @@ public class IteratorValidatorBatchIterator implements 
CloseableRecordBatch {
                       this.getClass().getCanonicalName()));
   }
 
+  @Override
+  public VectorContainer getContainer() {
+    return incoming.getContainer();
+  }
+
   public RecordBatch getIncoming() { return incoming; }
 
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 9d383c1..054ceec 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -228,4 +228,9 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
   public VectorContainer getOutgoingContainer() {
     throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
   }
+
+  @Override
+  public VectorContainer getContainer() {
+    return  container;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index fe7f9e9..ded4351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -255,6 +255,13 @@ public interface RecordBatch extends VectorAccessible {
   VectorContainer getOutgoingContainer();
 
   /**
+   *  Return the internal vector container
+   *
+   * @return The internal vector container
+   */
+  public VectorContainer getContainer();
+
+  /**
    * Gets the value vector type and ID for the given schema path.  The
    * TypedFieldId should store a fieldId which is the same as the ordinal
    * position of the field within the Iterator provided this class's
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
index 40447be..9dfa129 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java
@@ -104,4 +104,7 @@ public class SchemalessBatch implements 
CloseableRecordBatch {
   public void close() throws Exception {
     // This is present to match BatchCreator#getBatch() returning type.
   }
+
+  @Override
+  public VectorContainer getContainer() { return null; }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
index b6b7b21..4063e55 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -94,4 +94,9 @@ public class SimpleRecordBatch implements RecordBatch {
   public VectorContainer getOutgoingContainer() {
     throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
   }
+
+  @Override
+  public VectorContainer getContainer() {
+     return container;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 03e9249..06a89b0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 import java.lang.reflect.Array;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -42,7 +43,10 @@ public class VectorContainer implements VectorAccessible {
   private final BufferAllocator allocator;
   protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
   private BatchSchema schema;
-  private int recordCount = -1;
+
+  private int recordCount = 0;
+  private boolean initialized = false;
+  // private BufferAllocator allocator;
   private boolean schemaChanged = true; // Schema has changed since last 
built. Must rebuild schema
 
   public VectorContainer() {
@@ -209,6 +213,109 @@ public class VectorContainer implements VectorAccessible {
     }
   }
 
+  /**
+    * This works with non-hyper {@link VectorContainer}s which have no 
selection vectors.
+    * Appends a row taken from a source {@link VectorContainer} to this {@link 
VectorContainer}.
+    * @param srcContainer The {@link VectorContainer} to copy a row from.
+    * @param srcIndex The index of the row to copy from the source {@link 
VectorContainer}.
+    * @return Position where the row was appended
+    */
+    public int appendRow(VectorContainer srcContainer, int srcIndex) {
+      for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
+        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+        ValueVector srcVector = 
srcContainer.wrappers.get(vectorIndex).getValueVector();
+        destVector.copyEntry(recordCount, srcVector, srcIndex);
+      }
+      int pos = recordCount++;
+      initialized = true;
+      return pos;
+    }
+
+  /**
+   *  This method currently is only used by the Hash Join to return a row 
composed of build+probe rows
+   *
+   * This works with non-hyper {@link VectorContainer}s which have no 
selection vectors.
+   * Appends a row taken from two source {@link VectorContainer}s to this 
{@link VectorContainer}.
+   * @param buildSrcContainer The {@link VectorContainer} to copy the first 
columns of a row from.
+   * @param buildSrcIndex The index of the row to copy from the build side 
source {@link VectorContainer}.
+   * @param probeSrcContainer The {@link VectorContainer} to copy the last 
columns of a row from.
+   * @param probeSrcIndex The index of the row to copy from the probe side 
source {@link VectorContainer}.
+   * @return Number of records in the container after appending
+   */
+  public int appendRowXXX(VectorContainer buildSrcContainer, int 
buildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
+    if ( buildSrcContainer != null ) {
+      for (int vectorIndex = 0; vectorIndex < 
buildSrcContainer.wrappers.size(); vectorIndex++) {
+        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+        ValueVector srcVector = 
buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
+        destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
+      }
+    }
+    if ( probeSrcContainer != null ) {
+      int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
+      for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); 
vectorIndex++) {
+        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+        ValueVector srcVector = 
probeSrcContainer.wrappers.get(vectorIndex).getValueVector();
+        destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
+      }
+    }
+    recordCount++;
+    initialized = true;
+    return recordCount;
+  }
+
+  private int appendBuild(VectorContainer buildSrcContainer, int 
buildSrcIndex) {
+    // "- 1" to skip the last "hash values" added column
+    int lastIndex = buildSrcContainer.wrappers.size() - 1 ;
+    for (int vectorIndex = 0; vectorIndex < lastIndex; vectorIndex++) {
+      ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+      ValueVector srcVector = 
buildSrcContainer.wrappers.get(vectorIndex).getValueVector();
+      destVector.copyEntry(recordCount, srcVector, buildSrcIndex);
+    }
+    return lastIndex;
+  }
+  private void appendProbe(VectorContainer probeSrcContainer, int 
probeSrcIndex, int baseIndex) {
+      // int baseIndex = wrappers.size() - probeSrcContainer.wrappers.size();
+      for (int vectorIndex = baseIndex; vectorIndex < wrappers.size(); 
vectorIndex++) {
+        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+        ValueVector srcVector = probeSrcContainer.wrappers.get(vectorIndex - 
baseIndex).getValueVector();
+        destVector.copyEntry(recordCount, srcVector, probeSrcIndex);
+      }
+  }
+  /**
+   *  A special version of appendRow for the HashJoin; uses a composite index 
for the build side
+   * @param buildSrcContainers The containers list for the right side
+   * @param compositeBuildSrcIndex Composite build index
+   * @param probeSrcContainer The single container for the left/outer side
+   * @param probeSrcIndex Index in the outer container
+   * @return Number of rows in this container (after the append)
+   */
+  public int appendRow(ArrayList<VectorContainer> buildSrcContainers, int 
compositeBuildSrcIndex, VectorContainer probeSrcContainer, int probeSrcIndex) {
+    int buildBatch = compositeBuildSrcIndex >>> 16;
+    int buildOffset = compositeBuildSrcIndex & 65535;
+    int baseInd = 0;
+    if ( buildSrcContainers != null ) { baseInd = 
appendBuild(buildSrcContainers.get(buildBatch), buildOffset); }
+    if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, 
probeSrcIndex, baseInd); }
+    recordCount++;
+    initialized = true;
+    return recordCount;
+  }
+
+  /**
+   * A customised version of the special appendRow for HashJoin - used for Left
+   * Outer Join when there is no build side match - hence need a base index in
+   * this container's wrappers from where to start appending
+   * @param probeSrcContainer
+   * @param probeSrcIndex
+   * @param baseInd - index of this container's wrapper to start at
+   * @return
+   */
+  public int appendOuterRow(VectorContainer probeSrcContainer, int 
probeSrcIndex, int baseInd) {
+    appendProbe(probeSrcContainer, probeSrcIndex, baseInd);
+    recordCount++;
+    initialized = true;
+    return recordCount;
+  }
+
   public TypedFieldId add(ValueVector vv) {
     schemaChanged = true;
     schema = null;
@@ -217,6 +324,11 @@ public class VectorContainer implements VectorAccessible {
     return new TypedFieldId(vv.getField().getType(), i);
   }
 
+  public ValueVector getLast() {
+    int sz = wrappers.size();
+    if ( sz == 0 ) { return null; }
+    return wrappers.get(sz - 1).getValueVector();
+  }
   public void add(ValueVector[] hyperVector) {
     add(hyperVector, true);
   }
@@ -343,7 +455,8 @@ public class VectorContainer implements VectorAccessible {
   }
 
   public void setRecordCount(int recordCount) {
-    this.recordCount = recordCount;
+      this.recordCount = recordCount;
+      initialized = true;
   }
 
   @Override
@@ -352,7 +465,7 @@ public class VectorContainer implements VectorAccessible {
     return recordCount;
   }
 
-  public boolean hasRecordCount() { return recordCount != -1; }
+  public boolean hasRecordCount() { return initialized; }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
@@ -418,6 +531,7 @@ public class VectorContainer implements VectorAccessible {
     merged.wrappers.addAll(wrappers);
     merged.wrappers.addAll(otherContainer.wrappers);
     merged.schemaChanged = false;
+    merged.initialized = true;
     return merged;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 7f02773..810fd37 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -117,6 +117,11 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(PlannerSettings.JOIN_OPTIMIZATION),
       new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL),
       new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing
+      new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
+      new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR),
+      new 
OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_PER_PARTITION_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
       new 
OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // 
for tuning
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index b2013d5..f321691 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -244,6 +244,17 @@ drill.exec: {
     // if they do not exist.
     directories: [ "/tmp/drill/spill" ]
   },
+  hashjoin: {
+    spill: {
+      // -- The 2 options below can be used to override the common ones
+      // -- (common to all spilling operators)
+      // File system to use. Local file system by default.
+      fs: ${drill.exec.spill.fs},
+      // List of directories to use. Directories are created
+      // if they do not exist.
+      directories:  ${drill.exec.spill.directories},
+    }
+  },
   hashagg: {
     spill: {
       // -- The 2 options below can be used to override the common ones
@@ -427,9 +438,16 @@ drill.exec.options: {
     exec.enable_bulk_load_table_list: false,
     exec.enable_union_type: false,
     exec.errors.verbose: false,
+    exec.hashjoin.mem_limit: 0,
+    exec.hashjoin.num_partitions: 32,
+    exec.hashjoin.num_rows_in_batch: 1024,
+    exec.hashjoin.max_batches_in_memory: 128,
+    exec.hashjoin.max_batches_per_partition: 512,
     exec.hashagg.mem_limit: 0,
     exec.hashagg.min_batches_per_partition: 2,
     exec.hashagg.num_partitions: 32,
+    exec.hashagg.num_rows_in_batch: 128,
+    exec.hashagg.max_batches_in_memory: 65536,
     exec.hashagg.use_memory_prediction: true,
     exec.impersonation.inbound_policies: "[]",
     exec.java.compiler.exp_in_method_size: 50,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
new file mode 100644
index 0000000..4cfe6b4
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinSpill.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
+
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+
+@Category({SlowTest.class, OperatorTest.class})
+public class TestHashJoinSpill extends PhysicalOpUnitTestBase {
+
+
+  @SuppressWarnings("unchecked")
+  @Test
+  // Should spill, including recursive spill
+  public void testSimpleHashJoinSpill() {
+    HashJoinPOP joinConf = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions",
 4);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch",
 64);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory",
 8);
+    // Put some duplicate values
+    List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a 
string\"}]",
+      "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : 
\"yet another\"}]");
+    List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a 
string\"}]",
+      "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : 
\"yet another\"}]");
+    int numRows = 2_500;
+    for ( int cnt = 1; cnt <= numRows; cnt++ ) {
+      leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+      rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+    }
+
+    opTestBuilder()
+      .physicalOperator(joinConf)
+      .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
+      .baselineColumns("lft", "a", "b", "rgt")
+      .expectedTotalRows( numRows + 9 )
+      .go();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRightOuterHashJoinSpill() {
+    HashJoinPOP joinConf = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.RIGHT);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions",
 4);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch",
 64);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory",
 8);
+    // Put some duplicate values
+    List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a 
string\"}]",
+      "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : 
\"yet another\"}]");
+    List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a 
string\"}]",
+      "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : 
\"yet another\"}]");
+    int numRows = 8_000;
+    for ( int cnt = 1; cnt <= numRows; cnt++ ) {
+      // leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+      rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+    }
+
+    opTestBuilder()
+      .physicalOperator(joinConf)
+      .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
+      .baselineColumns("lft", "a", "b", "rgt")
+      .expectedTotalRows( numRows + 9 )
+      .go();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testLeftOuterHashJoinSpill() {
+    HashJoinPOP joinConf = new HashJoinPOP(null, null,
+      Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.LEFT);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions",
 8);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_rows_in_batch",
 64);
+    
operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.max_batches_in_memory",
 12);
+    // Put some duplicate values
+    List<String> leftTable = Lists.newArrayList("[{\"lft\": 0, \"a\" : \"a 
string\"}]",
+      "[{\"lft\": 0, \"a\" : \"a different string\"},{\"lft\": 0, \"a\" : 
\"yet another\"}]");
+    List<String> rightTable = Lists.newArrayList("[{\"rgt\": 0, \"b\" : \"a 
string\"}]",
+      "[{\"rgt\": 0, \"b\" : \"a different string\"},{\"rgt\": 0, \"b\" : 
\"yet another\"}]");
+    int numRows = 4_000; // 100_000
+    for ( int cnt = 1; cnt <= numRows / 2 ; cnt++ ) { // inner use only half, 
to check the left-outer join
+      // leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+      rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+    }
+    for ( int cnt = 1; cnt <= numRows; cnt++ ) {
+      leftTable.add("[{\"lft\": " + cnt + ", \"a\" : \"a string\"}]");
+      // rightTable.add("[{\"rgt\": " + cnt + ", \"b\" : \"a string\"}]");
+    }
+
+    opTestBuilder()
+      .physicalOperator(joinConf)
+      .inputDataStreamsJson(Lists.newArrayList(leftTable,rightTable))
+      .baselineColumns("lft", "a", "b", "rgt")
+      .expectedTotalRows( numRows + 9 )
+      .go();
+  }
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index f45f558..6374f1f 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -151,7 +151,7 @@ public class MiniPlanUnitTestBase extends 
PhysicalOpUnitTestBase {
       }
       Map<String, List<Object>> actualSuperVectors = new TreeMap<String, 
List<Object>>();
 
-      int actualBatchNum = 
DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, 
null, actualSuperVectors);
+      int actualBatchNum = 
DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, 
null, actualSuperVectors, null);
       if (expectBatchNum != null) {
         if (expectBatchNum != actualBatchNum) {
           throw new AssertionError(String.format("Expected %s batches from 
operator tree. But operators return %s batch!", expectBatchNum, 
actualBatchNum));
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 4bab883..16081be 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -216,6 +216,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     private boolean expectNoRows;
     private Long expectedBatchSize;
     private Integer expectedNumBatches;
+    private Integer expectedTotalRows;
 
     @SuppressWarnings({"unchecked", "resource"})
     public void go() {
@@ -235,7 +236,8 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 
         testOperator = opCreator.getBatch(fragContext, popConfig, 
incomingStreams);
 
-        Map<String, List<Object>> actualSuperVectors = 
DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), 
expectedBatchSize, expectedNumBatches);
+        Map<String, List<Object>> actualSuperVectors = 
DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), 
expectedBatchSize, expectedNumBatches, expectedTotalRows);
+        if ( expectedTotalRows != null ) { return; } // when checking total 
rows, don't compare actual results
 
         Map<String, List<Object>> expectedSuperVectors;
 
@@ -328,6 +330,11 @@ public class PhysicalOpUnitTestBase extends ExecTest {
       this.expectedBatchSize = batchSize;
       return this;
     }
+
+    public OperatorTestBuilder expectedTotalRows(Integer expectedTotalRows) {
+      this.expectedTotalRows = expectedTotalRows;
+      return this;
+    }
   }
 
   /**
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index b5450e6..57bc79c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -315,15 +315,16 @@ public class DrillTestWrapper {
    * Iterate over batches, and combine the batches into a map, where key is 
schema path, and value is
    * the list of column values across all the batches.
    * @param batches
+   * @param expectedTotalRecords
    * @return
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
   public static Map<String, List<Object>> 
addToCombinedVectorResults(Iterable<VectorAccessible> batches,
-                                                                     Long 
expectedBatchSize, Integer expectedNumBatches)
+                                                                     Long 
expectedBatchSize, Integer expectedNumBatches, Integer expectedTotalRecords)
       throws SchemaChangeException, UnsupportedEncodingException {
     Map<String, List<Object>> combinedVectors = new TreeMap<>();
-    addToCombinedVectorResults(batches, null, expectedBatchSize, 
expectedNumBatches, combinedVectors);
+    addToCombinedVectorResults(batches, null, expectedBatchSize, 
expectedNumBatches, combinedVectors, expectedTotalRecords);
     return combinedVectors;
   }
 
@@ -340,7 +341,7 @@ public class DrillTestWrapper {
    */
   public static int addToCombinedVectorResults(Iterable<VectorAccessible> 
batches, BatchSchema expectedSchema,
                                                Long expectedBatchSize, Integer 
expectedNumBatches,
-                                               Map<String, List<Object>> 
combinedVectors)
+                                               Map<String, List<Object>> 
combinedVectors, Integer expectedTotalRecords)
        throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     int numBatch = 0;
@@ -443,6 +444,9 @@ public class DrillTestWrapper {
       Assert.assertTrue(numBatch <= (2*expectedNumBatches));
     }
 
+    if ( expectedTotalRecords != null ) {
+      Assert.assertEquals(expectedTotalRecords.longValue(), totalRecords);
+    }
     return numBatch;
   }
 
@@ -562,7 +566,7 @@ public class DrillTestWrapper {
       addTypeInfoIfMissing(actual.get(0), testBuilder);
 
       BatchIterator batchIter = new BatchIterator(actual, loader);
-      actualSuperVectors = addToCombinedVectorResults(batchIter, null, null);
+      actualSuperVectors = addToCombinedVectorResults(batchIter, null, null, 
null);
       batchIter.close();
 
       // If baseline data was not provided to the test builder directly, we 
must run a query for the baseline, this includes
@@ -575,7 +579,7 @@ public class DrillTestWrapper {
           test(baselineOptionSettingQueries);
           expected = testRunAndReturn(baselineQueryType, 
testBuilder.getValidationQuery());
           BatchIterator exBatchIter = new BatchIterator(expected, loader);
-          expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, 
null);
+          expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, 
null, null);
           exBatchIter.close();
         }
       } else {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
index 02156f6..a3cd918 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBatch.java
@@ -105,4 +105,7 @@ public class RowSetBatch implements RecordBatch {
   public Iterator<VectorWrapper<?>> iterator() {
     return rowSet.container().iterator();
   }
+
+  @Override
+  public VectorContainer getContainer() { return rowSet.container(); }
 }
diff --git a/exec/java-exec/src/test/resources/empty.json 
b/exec/java-exec/src/test/resources/empty.json
new file mode 100644
index 0000000..e69de29

-- 
To stop receiving notification emails like this one, please contact
b...@apache.org.

Reply via email to