Shiva Jahangiri has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3421


Change subject: Refactoring OptimizedHybrishHashJoin Introduced RunfilesManager 
and PartitionsManager
......................................................................

Refactoring OptimizedHybrishHashJoin
Introduced RunfilesManager and PartitionsManager

Change-Id: I3f6d011f8af256b290cc28a04a412bcbd005920a
---
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
6 files changed, 334 insertions(+), 206 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/21/3421/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 6c08be2..a435919 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -113,4 +113,12 @@
      */
     void clearPartition(int partition) throws HyracksDataException;

+    /**
+     * Flushes the particular partition {@code pid} to {@code writer} and
+     * clears it.
+     * @param pid
+     * @throws HyracksDataException
+     */
+    void flushAndClearPartition(int pid, IFrameWriter writer) throws 
HyracksDataException;
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4578c2e..a11eab5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -125,6 +125,12 @@
     }

     @Override
+    public void flushAndClearPartition(int pid, IFrameWriter writer) throws 
HyracksDataException {
+        flushPartition(pid, writer);
+        clearPartition(pid);
+    }
+
+    @Override
     public boolean insertTuple(int partition, byte[] byteArray, int[] 
fieldEndOffsets, int start, int size,
             TuplePointer pointer) throws HyracksDataException {
         int actualSize = calculateActualSize(fieldEndOffsets, size);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
new file mode 100644
index 0000000..f05f986
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HashJoinPartitionsManager {
+    private int[] buildPSizeInTups;
+    private int[] probePSizeInTups;
+    private final BitSet spilledStatus; //0=resident, 1=spilled
+
+    HashJoinPartitionsManager(int numberOfPartitions) {
+        this.buildPSizeInTups = new int[numberOfPartitions];
+        this.probePSizeInTups = new int[numberOfPartitions];
+        this.spilledStatus = new BitSet(numberOfPartitions);
+    }
+
+    public BitSet getSpilledStatus() {
+        return spilledStatus;
+    }
+
+    public void setPartitionAsSpilled(int pid) {
+        this.spilledStatus.set(pid);
+    }
+
+    public int getBuildPartitionSizeInTup(int pid) {
+        return (buildPSizeInTups[pid]);
+    }
+
+    public int getProbePartitionSizeInTup(int pid) {
+        return (probePSizeInTups[pid]);
+    }
+
+    public boolean isBuildRelAllInMemory() {
+        return spilledStatus.nextSetBit(0) < 0;
+    }
+
+    public int getMaxBuildPartitionSize() {
+        return Arrays.stream(buildPSizeInTups).max().getAsInt();
+    }
+
+    public int getMaxProbePartitionSize() {
+        return Arrays.stream(probePSizeInTups).max().getAsInt();
+    }
+
+    public void incrementNumOfTuplesByOne(int pid, 
OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+        switch (whichSide) {
+            case BUILD:
+                buildPSizeInTups[pid]++;
+                break;
+            case PROBE:
+                probePSizeInTups[pid]++;
+                break;
+            default:
+                throw new HyracksDataException("Side has to be either Build or 
Probe.");
+        }
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
new file mode 100644
index 0000000..9841737
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinRunFilesManager.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+
+public class HashJoinRunFilesManager {
+    private IHyracksTaskContext ctx;
+    private final String buildRelName;
+    private final String probeRelName;
+    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+
+    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+
+    HashJoinRunFilesManager(int numberOfPartitions, IHyracksTaskContext ctx, 
String buildRelName, String probeRelName) {
+        this.ctx = ctx;
+        this.buildRelName = buildRelName;
+        this.probeRelName = probeRelName;
+        this.buildRFWriters = new RunFileWriter[numberOfPartitions];
+        this.probeRFWriters = new RunFileWriter[numberOfPartitions];
+    }
+
+    /**
+     * In case of failure happens, we need to clear up the generated temporary 
files.
+     */
+    public void clearProbeTempFiles() throws HyracksDataException {
+        for (int i = 0; i < probeRFWriters.length; i++) {
+            if (probeRFWriters[i] != null) {
+                probeRFWriters[i].erase();
+            }
+        }
+    }
+
+    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException 
{
+        return ((buildRFWriters[pid] == null) ? null : 
(buildRFWriters[pid]).createDeleteOnCloseReader());
+    }
+
+    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException 
{
+        return ((probeRFWriters[pid] == null) ? null : 
(probeRFWriters[pid]).createDeleteOnCloseReader());
+    }
+
+    private void closeBuildPartition(int pid) throws HyracksDataException {
+        if (buildRFWriters[pid] == null) {
+            throw new HyracksDataException("Tried to close the non-existing 
file writer.");
+        }
+        buildRFWriters[pid].close();
+    }
+
+    public void setRunFileWriter(int pid, OptimizedHybridHashJoin.SIDE 
whichSide, RunFileWriter rfw) {
+        RunFileWriter runFileWriter = null;
+        switch (whichSide) {
+            case BUILD:
+                buildRFWriters[pid] = rfw;
+                break;
+            case PROBE:
+                probeRFWriters[pid] = rfw;
+                break;
+        }
+    }
+
+    public RunFileWriter getSpillWriterOrCreateIfNotExist(int pid, 
OptimizedHybridHashJoin.SIDE whichSide)
+            throws HyracksDataException {
+        RunFileWriter[] runFileWriters = null;
+        String refName = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                refName = buildRelName;
+                break;
+            case PROBE:
+                refName = probeRelName;
+                runFileWriters = probeRFWriters;
+                break;
+        }
+        RunFileWriter writer = runFileWriters[pid];
+        if (writer == null) {
+            FileReference file = 
ctx.getJobletContext().createManagedWorkspaceFile(refName);
+            writer = new RunFileWriter(file, ctx.getIoManager());
+            writer.open();
+            runFileWriters[pid] = writer;
+        }
+        return writer;
+    }
+
+    /**
+     * In case of failure happens, we need to clear up the generated temporary 
files.
+     */
+    public void clearBuildTempFiles() throws HyracksDataException {
+        for (int i = 0; i < buildRFWriters.length; i++) {
+            if (buildRFWriters[i] != null) {
+                buildRFWriters[i].erase();
+            }
+        }
+    }
+
+    public void closeAllSpilledPartitions(OptimizedHybridHashJoin.SIDE 
whichSide) throws HyracksDataException {
+        RunFileWriter[] runFileWriters = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                break;
+            case PROBE:
+                runFileWriters = probeRFWriters;
+                break;
+        }
+
+        if (runFileWriters != null) {
+            for (RunFileWriter runFileWriter : runFileWriters) {
+                if (runFileWriter != null) {
+                    runFileWriter.close();
+                }
+            }
+        }
+    }
+
+    public void closeSpilledPartition(int pid, OptimizedHybridHashJoin.SIDE 
whichSide) throws HyracksDataException {
+        RunFileWriter runFileWriter = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriter = buildRFWriters[pid];
+                break;
+            case PROBE:
+                runFileWriter = probeRFWriters[pid];
+                break;
+        }
+        if (runFileWriter != null) {
+            runFileWriter.close();
+        }
+    }
+
+    public RunFileWriter[] getBuildRFWriters() {
+        return buildRFWriters;
+    }
+
+    public RunFileWriter[] getProbeRFWriters() {
+        return probeRFWriters;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..bb27fdd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -56,6 +55,8 @@

     // Used for special probe BigObject which can not be held into the Join 
memory
     private FrameTupleAppender bigProbeFrameAppender;
+    private HashJoinPartitionsManager partitionManager;
+    private HashJoinRunFilesManager runFilesManager;

     public enum SIDE {
         BUILD,
@@ -74,14 +75,10 @@
     private final RecordDescriptor buildRd;
     private final RecordDescriptor probeRd;

-    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
-    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
     private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
     private final IMissingWriter[] nonMatchWriters;

-    private final BitSet spilledStatus; //0=resident, 1=spilled
     private final int numOfPartitions;
     private final int memSizeInFrames;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident 
partitions
@@ -98,16 +95,17 @@
     private boolean isReversed; //Added for handling correct calling for 
predicate-evaluator upon recursive calls that cause role-reversal

     // stats information
-    private int[] buildPSizeInTups;
+
     private IFrame reloadBuffer;
     private TuplePointer tempPtr = new TuplePointer(); // this is a reusable 
object to store the pointer,which is not used anywhere.
                                                        // we mainly use it to 
match the corresponding function signature.
-    private int[] probePSizeInTups;

     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int 
memSizeInFrames, int numOfPartitions,
             String probeRelName, String buildRelName, ITuplePairComparator 
comparator, RecordDescriptor probeRd,
             RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, 
ITuplePartitionComputer buildHpc,
             IPredicateEvaluator predEval, boolean isLeftOuter, 
IMissingWriterFactory[] nullWriterFactories1) {
+        this.partitionManager = new HashJoinPartitionsManager(numOfPartitions);
+        this.runFilesManager = new HashJoinRunFilesManager(numOfPartitions, 
ctx, buildRelName, probeRelName);
         this.ctx = ctx;
         this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
@@ -117,10 +115,7 @@
         this.comparator = comparator;
         this.buildRelName = buildRelName;
         this.probeRelName = probeRelName;
-
         this.numOfPartitions = numOfPartitions;
-        this.buildRFWriters = new RunFileWriter[numOfPartitions];
-        this.probeRFWriters = new RunFileWriter[numOfPartitions];

         this.accessorBuild = new FrameTupleAccessor(buildRd);
         this.accessorProbe = new FrameTupleAccessor(probeRd);
@@ -128,8 +123,6 @@
         this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         this.isReversed = false;
-
-        this.spilledStatus = new BitSet(numOfPartitions);

         this.nonMatchWriters = isLeftOuter ? new 
IMissingWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
@@ -142,12 +135,10 @@
     public void initBuild() throws HyracksDataException {
         framePool = new DeallocatableFramePool(ctx, memSizeInFrames * 
ctx.getInitialFrameSize());
         bufferManagerForHashTable = new 
FramePoolBackedFrameBufferManager(framePool);
-        bufferManager = new VPartitionTupleBufferManager(
-                
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
-                numOfPartitions, framePool);
-        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, 
spilledStatus);
-        spilledStatus.clear();
-        buildPSizeInTups = new int[numOfPartitions];
+        bufferManager = new 
VPartitionTupleBufferManager(PreferToSpillFullyOccupiedFramePolicy
+                
.createAtMostOneFrameForSpilledPartitionConstrain(partitionManager.getSpilledStatus()),
 numOfPartitions,
+                framePool);
+        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, 
partitionManager.getSpilledStatus());
     }

     public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -157,15 +148,14 @@
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
             processTuple(i, pid);
-            buildPSizeInTups[pid]++;
         }
-
     }

     private void processTuple(int tid, int pid) throws HyracksDataException {
         while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
             selectAndSpillVictim(pid);
         }
+        partitionManager.incrementNumOfTuplesByOne(pid, SIDE.BUILD);
     }

     private void selectAndSpillVictim(int pid) throws HyracksDataException {
@@ -178,40 +168,8 @@
     }

     private void spillPartition(int pid) throws HyracksDataException {
-        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, 
SIDE.BUILD);
-        bufferManager.flushPartition(pid, writer);
-        bufferManager.clearPartition(pid);
-        spilledStatus.set(pid);
-    }
-
-    private void closeBuildPartition(int pid) throws HyracksDataException {
-        if (buildRFWriters[pid] == null) {
-            throw new HyracksDataException("Tried to close the non-existing 
file writer.");
-        }
-        buildRFWriters[pid].close();
-    }
-
-    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE 
whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        String refName = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                refName = buildRelName;
-                break;
-            case PROBE:
-                refName = probeRelName;
-                runFileWriters = probeRFWriters;
-                break;
-        }
-        RunFileWriter writer = runFileWriters[pid];
-        if (writer == null) {
-            FileReference file = 
ctx.getJobletContext().createManagedWorkspaceFile(refName);
-            writer = new RunFileWriter(file, ctx.getIoManager());
-            writer.open();
-            runFileWriters[pid] = writer;
-        }
-        return writer;
+        bufferManager.flushAndClearPartition(pid, 
runFilesManager.getSpillWriterOrCreateIfNotExist(pid, SIDE.BUILD));
+        partitionManager.setPartitionAsSpilled(pid);
     }

     public void closeBuild() throws HyracksDataException {
@@ -229,47 +187,6 @@
     }

     /**
-     * In case of failure happens, we need to clear up the generated temporary 
files.
-     */
-    public void clearBuildTempFiles() throws HyracksDataException {
-        for (int i = 0; i < buildRFWriters.length; i++) {
-            if (buildRFWriters[i] != null) {
-                buildRFWriters[i].erase();
-            }
-        }
-    }
-
-    private void closeAllSpilledPartitions(SIDE whichSide) throws 
HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                break;
-            case PROBE:
-                runFileWriters = probeRFWriters;
-                break;
-        }
-        try {
-            for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < 
numOfPartitions; pid =
-                    spilledStatus.nextSetBit(pid + 1)) {
-                if (bufferManager.getNumTuples(pid) > 0) {
-                    bufferManager.flushPartition(pid, 
getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
-                    bufferManager.clearPartition(pid);
-                }
-            }
-        } finally {
-            // Force to close all run file writers.
-            if (runFileWriters != null) {
-                for (RunFileWriter runFileWriter : runFileWriters) {
-                    if (runFileWriter != null) {
-                        runFileWriter.close();
-                    }
-                }
-            }
-        }
-    }
-
-    /**
      * Makes the space for the hash table. If there is no enough space, one or 
more partitions will be spilled
      * to the disk until the hash table can fit into the memory. After this, 
bring back spilled partitions
      * if there is available memory.
@@ -280,6 +197,7 @@
     private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws 
HyracksDataException {
         // we need number of |spilledPartitions| buffers to store the probe 
data
         int frameSize = ctx.getInitialFrameSize();
+        BitSet spilledStatus = partitionManager.getSpilledStatus();
         long freeSpace = (long) (memSizeInFrames - 
spilledStatus.cardinality()) * frameSize;

         // For partitions in main memory, we deduct their size from the free 
space.
@@ -287,7 +205,7 @@
         for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
                 spilledStatus.nextClearBit(p + 1)) {
             freeSpace -= bufferManager.getPhysicalSize(p);
-            inMemTupCount += buildPSizeInTups[p];
+            inMemTupCount += partitionManager.getBuildPartitionSizeInTup(p);
         }

         // Calculates the expected hash table size for the given number of 
tuples in main memory
@@ -309,11 +227,11 @@
                 // There is a suitable one. We spill that partition to the 
disk.
                 long hashTableSizeDecrease =
                         
-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
-                                -buildPSizeInTups[pidToSpill], frameSize);
+                                
-partitionManager.getBuildPartitionSizeInTup(pidToSpill), frameSize);
                 freeSpace = freeSpace + 
bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
-                inMemTupCount -= buildPSizeInTups[pidToSpill];
+                inMemTupCount -= 
partitionManager.getBuildPartitionSizeInTup(pidToSpill);
                 spillPartition(pidToSpill);
-                closeBuildPartition(pidToSpill);
+                runFilesManager.closeSpilledPartition(pidToSpill, SIDE.BUILD);
                 moreSpilled = true;
             } else {
                 // There is no single suitable partition. So, we need to spill 
multiple partitions to the disk
@@ -321,12 +239,12 @@
                 for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
                         spilledStatus.nextClearBit(p + 1)) {
                     int spaceToBeReturned = bufferManager.getPhysicalSize(p);
-                    int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
+                    int numberOfTuplesToBeSpilled = 
partitionManager.getBuildPartitionSizeInTup(p);
                     if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 
0) {
                         continue;
                     }
                     spillPartition(p);
-                    closeBuildPartition(p);
+                    runFilesManager.closeSpilledPartition(p, SIDE.BUILD);
                     moreSpilled = true;
                     // Since the number of tuples in memory has been decreased,
                     // the hash table size will be decreased, too.
@@ -353,13 +271,13 @@
         // Brings back some partitions if there is enough free space.
         int pid = 0;
         while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) 
>= 0) {
-            if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+            if (!loadSpilledPartitionToMem(pid, 
runFilesManager.getBuildRFWriters()[pid])) {
                 break;
             }
-            long expectedHashTableByteSizeIncrease = SerializableHashTable
-                    .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, 
buildPSizeInTups[pid], frameSize);
+            long expectedHashTableByteSizeIncrease = 
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+                    inMemTupCount, 
partitionManager.getBuildPartitionSizeInTup(pid), frameSize);
             freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - 
expectedHashTableByteSizeIncrease;
-            inMemTupCount += buildPSizeInTups[pid];
+            inMemTupCount += partitionManager.getBuildPartitionSizeInTup(pid);
             // Adjusts the hash table size
             hashTableByteSizeForInMemTuples += 
expectedHashTableByteSizeIncrease;
         }
@@ -377,14 +295,16 @@
         long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
         int minSpaceAfterSpillPartID = -1;

+        BitSet spilledStatus = partitionManager.getSpilledStatus();
         for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
                 spilledStatus.nextClearBit(p + 1)) {
-            if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) 
== 0) {
+            if (partitionManager.getBuildPartitionSizeInTup(p) == 0 || 
bufferManager.getPhysicalSize(p) == 0) {
                 continue;
             }
             // We put minus since the method returns a negative value to 
represent a newly reclaimed space.
-            spaceAfterSpill = currentFreeSpace + 
bufferManager.getPhysicalSize(p) + (-SerializableHashTable
-                    
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, 
-buildPSizeInTups[p], frameSize));
+            spaceAfterSpill = currentFreeSpace + 
bufferManager.getPhysicalSize(p)
+                    + 
(-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
+                            -partitionManager.getBuildPartitionSizeInTup(p), 
frameSize));
             if (spaceAfterSpill == 0) {
                 // Found the perfect one. Just returns this partition.
                 return p;
@@ -398,13 +318,14 @@
     }

     private int selectPartitionsToReload(long freeSpace, int pid, int 
inMemTupCount) {
+        BitSet spilledStatus = partitionManager.getSpilledStatus();
         for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < 
numOfPartitions; i =
                 spilledStatus.nextSetBit(i + 1)) {
-            int spilledTupleCount = buildPSizeInTups[i];
+            int spilledTupleCount = 
partitionManager.getBuildPartitionSizeInTup(i);
             // Expected hash table size increase after reloading this partition
             long expectedHashTableByteSizeIncrease = 
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
                     inMemTupCount, spilledTupleCount, 
ctx.getInitialFrameSize());
-            if (freeSpace >= buildRFWriters[i].getFileSize() + 
expectedHashTableByteSizeIncrease) {
+            if (freeSpace >= 
runFilesManager.getBuildRFWriters()[i].getFileSize() + 
expectedHashTableByteSizeIncrease) {
                 return i;
             }
         }
@@ -435,8 +356,8 @@
         } finally {
             r.close();
         }
-        spilledStatus.set(pid, false);
-        buildRFWriters[pid] = null;
+        partitionManager.getSpilledStatus().set(pid, false);
+        runFilesManager.setRunFileWriter(pid, SIDE.BUILD, null);
         return true;
     }

@@ -450,7 +371,7 @@
     private void loadDataInMemJoin() throws HyracksDataException {

         for (int pid = 0; pid < numOfPartitions; pid++) {
-            if (!spilledStatus.get(pid)) {
+            if (!partitionManager.getSpilledStatus().get(pid)) {
                 bufferManager.flushPartition(pid, new IFrameWriter() {
                     @Override
                     public void open() throws HyracksDataException {
@@ -476,18 +397,11 @@
         }
     }

-    public void initProbe() throws HyracksDataException {
-
-        probePSizeInTups = new int[numOfPartitions];
-        probeRFWriters = new RunFileWriter[numOfPartitions];
-
-    }
-
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws 
HyracksDataException {
         accessorProbe.reset(buffer);
         int tupleCount = accessorProbe.getTupleCount();

-        if (isBuildRelAllInMemory()) {
+        if (partitionManager.isBuildRelAllInMemory()) {
             inMemJoiner.join(buffer, writer);
             return;
         }
@@ -495,8 +409,8 @@
         for (int i = 0; i < tupleCount; ++i) {
             int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);

-            if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has 
potential match from previous phase
-                if (spilledStatus.get(pid)) { //pid is Spilled
+            if (partitionManager.getBuildPartitionSizeInTup(pid) > 0 || 
isLeftOuter) { //Tuple has potential match from previous phase
+                if (partitionManager.getSpilledStatus().get(pid)) { //pid is 
Spilled
                     while (!bufferManager.insertTuple(pid, accessorProbe, i, 
tempPtr)) {
                         int victim = pid;
                         if (bufferManager.getNumTuples(pid) == 0) { // current 
pid is empty, choose the biggest one
@@ -506,38 +420,46 @@
                             flushBigProbeObjectToDisk(pid, accessorProbe, i);
                             break;
                         }
-                        RunFileWriter runFileWriter = 
getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+                        RunFileWriter runFileWriter =
+                                
runFilesManager.getSpillWriterOrCreateIfNotExist(victim, SIDE.PROBE);
                         bufferManager.flushPartition(victim, runFileWriter);
                         bufferManager.clearPartition(victim);
                     }
                 } else { //pid is Resident
                     inMemJoiner.join(i, writer);
                 }
-                probePSizeInTups[pid]++;
+                partitionManager.incrementNumOfTuplesByOne(pid, SIDE.PROBE);
             }
         }
     }

     private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor 
accessorProbe, int i)
             throws HyracksDataException {
+        //TODO: The variable size frame is not using the join memory budget 
which can lead to using more memory than
+        // memory budegt. Needs to get fixed in a way to either flush it right 
away without using a frame, or allocate
+        // frames to it by spilling some partitions if possible.
         if (bigProbeFrameAppender == null) {
             bigProbeFrameAppender = new FrameTupleAppender(new 
VSizeFrame(ctx));
         }
-        RunFileWriter runFileWriter = 
getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+        RunFileWriter runFileWriter = 
runFilesManager.getSpillWriterOrCreateIfNotExist(pid, SIDE.PROBE);
         if (!bigProbeFrameAppender.append(accessorProbe, i)) {
             throw new HyracksDataException("The given tuple is too big");
         }
         bigProbeFrameAppender.write(runFileWriter, true);
     }

-    private boolean isBuildRelAllInMemory() {
-        return spilledStatus.nextSetBit(0) < 0;
-    }
-
     public void completeProbe(IFrameWriter writer) throws HyracksDataException 
{
         //We do NOT join the spilled partitions here, that decision is made at 
the descriptor level
         //(which join technique to use)
         inMemJoiner.completeJoin(writer);
+    }
+
+    public void closeAllSpilledPartitions(OptimizedHybridHashJoin.SIDE 
whichSide) throws HyracksDataException {
+        for (int pid = partitionManager.getSpilledStatus().nextSetBit(0); pid 
>= 0 && pid < numOfPartitions; pid =
+                partitionManager.getSpilledStatus().nextSetBit(pid + 1)) {
+            bufferManager.flushAndClearPartition(pid, 
runFilesManager.getSpillWriterOrCreateIfNotExist(pid, whichSide));
+            runFilesManager.closeSpilledPartition(pid, whichSide);
+        }
     }

     public void releaseResource() throws HyracksDataException {
@@ -549,58 +471,15 @@
         bufferManagerForHashTable = null;
     }

-    /**
-     * In case of failure happens, we need to clear up the generated temporary 
files.
-     */
-    public void clearProbeTempFiles() throws HyracksDataException {
-        for (int i = 0; i < probeRFWriters.length; i++) {
-            if (probeRFWriters[i] != null) {
-                probeRFWriters[i].erase();
-            }
-        }
-    }
-
-    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException 
{
-        return ((buildRFWriters[pid] == null) ? null : 
(buildRFWriters[pid]).createDeleteOnCloseReader());
-    }
-
-    public int getBuildPartitionSizeInTup(int pid) {
-        return (buildPSizeInTups[pid]);
-    }
-
-    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException 
{
-        return ((probeRFWriters[pid] == null) ? null : 
(probeRFWriters[pid]).createDeleteOnCloseReader());
-    }
-
-    public int getProbePartitionSizeInTup(int pid) {
-        return (probePSizeInTups[pid]);
-    }
-
-    public int getMaxBuildPartitionSize() {
-        int max = buildPSizeInTups[0];
-        for (int i = 1; i < buildPSizeInTups.length; i++) {
-            if (buildPSizeInTups[i] > max) {
-                max = buildPSizeInTups[i];
-            }
-        }
-        return max;
-    }
-
-    public int getMaxProbePartitionSize() {
-        int max = probePSizeInTups[0];
-        for (int i = 1; i < probePSizeInTups.length; i++) {
-            if (probePSizeInTups[i] > max) {
-                max = probePSizeInTups[i];
-            }
-        }
-        return max;
-    }
-
-    public BitSet getPartitionStatus() {
-        return spilledStatus;
-    }
-
     public void setIsReversed(boolean b) {
         this.isReversed = b;
     }
+
+    public HashJoinPartitionsManager getPartitionManager() {
+        return partitionManager;
+    }
+
+    public HashJoinRunFilesManager getRunFilesManager() {
+        return runFilesManager;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 2fd17da..daa3a35 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -305,7 +305,7 @@
                     if (state.hybridHJ != null) {
                         state.hybridHJ.closeBuild();
                         if (isFailed) {
-                            state.hybridHJ.clearBuildTempFiles();
+                            
state.hybridHJ.getRunFilesManager().clearBuildTempFiles();
                         } else {
                             ctx.setStateObject(state);
                             if (LOGGER.isTraceEnabled()) {
@@ -381,7 +381,6 @@
                             new TaskId(new ActivityId(getOperatorId(), 
BUILD_AND_PARTITION_ACTIVITY_ID), partition));

                     writer.open();
-                    state.hybridHJ.initProbe();

                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("OptimizedHybridHashJoin is starting the 
probe phase.");
@@ -404,8 +403,8 @@
                     if (failed) {
                         try {
                             // Clear temp files if fail() was called.
-                            state.hybridHJ.clearBuildTempFiles();
-                            state.hybridHJ.clearProbeTempFiles();
+                            
state.hybridHJ.getRunFilesManager().clearBuildTempFiles();
+                            
state.hybridHJ.getRunFilesManager().clearProbeTempFiles();
                         } finally {
                             writer.close(); // writer should always be closed.
                         }
@@ -418,12 +417,12 @@
                         } finally {
                             state.hybridHJ.releaseResource();
                         }
-                        BitSet partitionStatus = 
state.hybridHJ.getPartitionStatus();
+                        BitSet partitionStatus = 
state.hybridHJ.getPartitionManager().getSpilledStatus();
                         rPartbuff.reset();
                         for (int pid = partitionStatus.nextSetBit(0); pid >= 
0; pid =
                                 partitionStatus.nextSetBit(pid + 1)) {
-                            RunFileReader bReader = 
state.hybridHJ.getBuildRFReader(pid);
-                            RunFileReader pReader = 
state.hybridHJ.getProbeRFReader(pid);
+                            RunFileReader bReader = 
state.hybridHJ.getRunFilesManager().getBuildRFReader(pid);
+                            RunFileReader pReader = 
state.hybridHJ.getRunFilesManager().getProbeRFReader(pid);

                             if (bReader == null || pReader == null) {
                                 if (isLeftOuter && pReader != null) {
@@ -431,8 +430,8 @@
                                 }
                                 continue;
                             }
-                            int bSize = 
state.hybridHJ.getBuildPartitionSizeInTup(pid);
-                            int pSize = 
state.hybridHJ.getProbePartitionSizeInTup(pid);
+                            int bSize = 
state.hybridHJ.getPartitionManager().getBuildPartitionSizeInTup(pid);
+                            int pSize = 
state.hybridHJ.getPartitionManager().getProbePartitionSizeInTup(pid);
                             joinPartitionPair(bReader, pReader, bSize, pSize, 
1);
                         }
                     } catch (Exception e) {
@@ -440,8 +439,8 @@
                         // to send the failure signal to the downstream, when 
there is a throwable thrown.
                         writer.fail();
                         // Clear temp files as this.fail() nor this.close() 
will no longer be called after close().
-                        state.hybridHJ.clearBuildTempFiles();
-                        state.hybridHJ.clearProbeTempFiles();
+                        
state.hybridHJ.getRunFilesManager().clearBuildTempFiles();
+                        
state.hybridHJ.getRunFilesManager().clearProbeTempFiles();
                         // Re-throw the whatever is caught.
                         throw e;
                     } finally {
@@ -587,7 +586,6 @@
                         probeSideReader.open();
                         rPartbuff.reset();
                         try {
-                            rHHj.initProbe();
                             while (probeSideReader.nextFrame(rPartbuff)) {
                                 rHHj.probe(rPartbuff.getBuffer(), writer);
                             }
@@ -601,11 +599,11 @@
                     }

                     try {
-                        int maxAfterBuildSize = 
rHHj.getMaxBuildPartitionSize();
-                        int maxAfterProbeSize = 
rHHj.getMaxProbePartitionSize();
+                        int maxAfterBuildSize = 
rHHj.getPartitionManager().getMaxBuildPartitionSize();
+                        int maxAfterProbeSize = 
rHHj.getPartitionManager().getMaxProbePartitionSize();
                         int afterMax = Math.max(maxAfterBuildSize, 
maxAfterProbeSize);

-                        BitSet rPStatus = rHHj.getPartitionStatus();
+                        BitSet rPStatus = 
rHHj.getPartitionManager().getSpilledStatus();
                         if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * 
beforeMax))) {
                             //Case 2.1.1 - Keep applying HHJ
                             if (LOGGER.isDebugEnabled()) {
@@ -613,10 +611,10 @@
                                         + "(isLeftOuter || build<probe) - 
[Level " + level + "]");
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; 
rPid = rPStatus.nextSetBit(rPid + 1)) {
-                                RunFileReader rbrfw = 
rHHj.getBuildRFReader(rPid);
-                                RunFileReader rprfw = 
rHHj.getProbeRFReader(rPid);
-                                int rbSizeInTuple = 
rHHj.getBuildPartitionSizeInTup(rPid);
-                                int rpSizeInTuple = 
rHHj.getProbePartitionSizeInTup(rPid);
+                                RunFileReader rbrfw = 
rHHj.getRunFilesManager().getBuildRFReader(rPid);
+                                RunFileReader rprfw = 
rHHj.getRunFilesManager().getProbeRFReader(rPid);
+                                int rbSizeInTuple = 
rHHj.getPartitionManager().getBuildPartitionSizeInTup(rPid);
+                                int rpSizeInTuple = 
rHHj.getPartitionManager().getProbePartitionSizeInTup(rPid);

                                 if (rbrfw == null || rprfw == null) {
                                     if (isLeftOuter && rprfw != null) {
@@ -639,8 +637,8 @@
                                         + "(isLeftOuter || build<probe) - 
[Level " + level + "]");
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; 
rPid = rPStatus.nextSetBit(rPid + 1)) {
-                                RunFileReader rbrfw = 
rHHj.getBuildRFReader(rPid);
-                                RunFileReader rprfw = 
rHHj.getProbeRFReader(rPid);
+                                RunFileReader rbrfw = 
rHHj.getRunFilesManager().getBuildRFReader(rPid);
+                                RunFileReader rprfw = 
rHHj.getRunFilesManager().getProbeRFReader(rPid);

                                 if (rbrfw == null || rprfw == null) {
                                     if (isLeftOuter && rprfw != null) {
@@ -650,8 +648,8 @@
                                     continue;
                                 }

-                                int buildSideInTups = 
rHHj.getBuildPartitionSizeInTup(rPid);
-                                int probeSideInTups = 
rHHj.getProbePartitionSizeInTup(rPid);
+                                int buildSideInTups = 
rHHj.getPartitionManager().getBuildPartitionSizeInTup(rPid);
+                                int probeSideInTups = 
rHHj.getPartitionManager().getProbePartitionSizeInTup(rPid);
                                 // NLJ order is outer + inner, the order is 
reversed from the other joins
                                 if (isLeftOuter || probeSideInTups < 
buildSideInTups) {
                                     //checked-modified
@@ -665,8 +663,8 @@
                     } catch (Exception e) {
                         // Make sure that temporary run files generated in 
recursive hybrid hash joins
                         // are closed and deleted.
-                        rHHj.clearBuildTempFiles();
-                        rHHj.clearProbeTempFiles();
+                        rHHj.getRunFilesManager().clearBuildTempFiles();
+                        rHHj.getRunFilesManager().clearProbeTempFiles();
                         throw e;
                     }
                 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/3421
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3f6d011f8af256b290cc28a04a412bcbd005920a
Gerrit-Change-Number: 3421
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <[email protected]>

Reply via email to