Shiva Jahangiri has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3422


Change subject: Step1-Making partitionManager for hash join
......................................................................

Step1-Making partitionManager for hash join

Change-Id: I7a5609f6709efa427769da33c3a3b93233984fd8
---
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
2 files changed, 69 insertions(+), 25 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/22/3422/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java
new file mode 100644
index 0000000..59d90d6
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.util.BitSet;
+
+public class HashJoinPartitionManager {
+
+    private final BitSet spilledStatus; //0=resident, 1=spilled
+
+    public HashJoinPartitionManager(int numOfPartitions) {
+        this.spilledStatus = new BitSet(numOfPartitions);
+    }
+
+    public BitSet getSpilledStatus() {
+        return spilledStatus;
+    }
+
+    public void setSpilledStatus(int pid) {
+        if (pid >= 0 && pid <= spilledStatus.length()) {
+            spilledStatus.set(pid);
+        }
+    }
+
+    public void clearSpilledStatus() {
+        spilledStatus.clear();
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..e50b19a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -56,6 +56,7 @@

     // Used for special probe BigObject which can not be held into the Join 
memory
     private FrameTupleAppender bigProbeFrameAppender;
+    private HashJoinPartitionManager partitionManager;

     public enum SIDE {
         BUILD,
@@ -81,7 +82,6 @@
     private final boolean isLeftOuter;
     private final IMissingWriter[] nonMatchWriters;

-    private final BitSet spilledStatus; //0=resident, 1=spilled
     private final int numOfPartitions;
     private final int memSizeInFrames;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident 
partitions
@@ -108,6 +108,7 @@
             String probeRelName, String buildRelName, ITuplePairComparator 
comparator, RecordDescriptor probeRd,
             RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, 
ITuplePartitionComputer buildHpc,
             IPredicateEvaluator predEval, boolean isLeftOuter, 
IMissingWriterFactory[] nullWriterFactories1) {
+        this.partitionManager = new HashJoinPartitionManager(numOfPartitions);
         this.ctx = ctx;
         this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
@@ -129,8 +130,6 @@
         this.isLeftOuter = isLeftOuter;
         this.isReversed = false;

-        this.spilledStatus = new BitSet(numOfPartitions);
-
         this.nonMatchWriters = isLeftOuter ? new 
IMissingWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
             for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -142,11 +141,11 @@
     public void initBuild() throws HyracksDataException {
         framePool = new DeallocatableFramePool(ctx, memSizeInFrames * 
ctx.getInitialFrameSize());
         bufferManagerForHashTable = new 
FramePoolBackedFrameBufferManager(framePool);
-        bufferManager = new VPartitionTupleBufferManager(
-                
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
-                numOfPartitions, framePool);
-        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, 
spilledStatus);
-        spilledStatus.clear();
+        bufferManager = new 
VPartitionTupleBufferManager(PreferToSpillFullyOccupiedFramePolicy
+                
.createAtMostOneFrameForSpilledPartitionConstrain(partitionManager.getSpilledStatus()),
 numOfPartitions,
+                framePool);
+        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, 
partitionManager.getSpilledStatus());
+        partitionManager.clearSpilledStatus();
         buildPSizeInTups = new int[numOfPartitions];
     }

@@ -181,7 +180,7 @@
         RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, 
SIDE.BUILD);
         bufferManager.flushPartition(pid, writer);
         bufferManager.clearPartition(pid);
-        spilledStatus.set(pid);
+        partitionManager.setSpilledStatus(pid);
     }

     private void closeBuildPartition(int pid) throws HyracksDataException {
@@ -250,8 +249,8 @@
                 break;
         }
         try {
-            for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < 
numOfPartitions; pid =
-                    spilledStatus.nextSetBit(pid + 1)) {
+            for (int pid = partitionManager.getSpilledStatus().nextSetBit(0); 
pid >= 0 && pid < numOfPartitions; pid =
+                    partitionManager.getSpilledStatus().nextSetBit(pid + 1)) {
                 if (bufferManager.getNumTuples(pid) > 0) {
                     bufferManager.flushPartition(pid, 
getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
                     bufferManager.clearPartition(pid);
@@ -280,12 +279,12 @@
     private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws 
HyracksDataException {
         // we need number of |spilledPartitions| buffers to store the probe 
data
         int frameSize = ctx.getInitialFrameSize();
-        long freeSpace = (long) (memSizeInFrames - 
spilledStatus.cardinality()) * frameSize;
+        long freeSpace = (long) (memSizeInFrames - 
partitionManager.getSpilledStatus().cardinality()) * frameSize;

         // For partitions in main memory, we deduct their size from the free 
space.
         int inMemTupCount = 0;
-        for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
-                spilledStatus.nextClearBit(p + 1)) {
+        for (int p = partitionManager.getSpilledStatus().nextClearBit(0); p >= 
0 && p < numOfPartitions; p =
+                partitionManager.getSpilledStatus().nextClearBit(p + 1)) {
             freeSpace -= bufferManager.getPhysicalSize(p);
             inMemTupCount += buildPSizeInTups[p];
         }
@@ -318,8 +317,8 @@
             } else {
                 // There is no single suitable partition. So, we need to spill 
multiple partitions to the disk
                 // in order to accommodate the hash table.
-                for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
-                        spilledStatus.nextClearBit(p + 1)) {
+                for (int p = 
partitionManager.getSpilledStatus().nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
+                        partitionManager.getSpilledStatus().nextClearBit(p + 
1)) {
                     int spaceToBeReturned = bufferManager.getPhysicalSize(p);
                     int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
                     if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 
0) {
@@ -377,8 +376,8 @@
         long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
         int minSpaceAfterSpillPartID = -1;

-        for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < 
numOfPartitions; p =
-                spilledStatus.nextClearBit(p + 1)) {
+        for (int p = partitionManager.getSpilledStatus().nextClearBit(0); p >= 
0 && p < numOfPartitions; p =
+                partitionManager.getSpilledStatus().nextClearBit(p + 1)) {
             if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) 
== 0) {
                 continue;
             }
@@ -398,8 +397,8 @@
     }

     private int selectPartitionsToReload(long freeSpace, int pid, int 
inMemTupCount) {
-        for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < 
numOfPartitions; i =
-                spilledStatus.nextSetBit(i + 1)) {
+        for (int i = partitionManager.getSpilledStatus().nextSetBit(pid); i >= 
0 && i < numOfPartitions; i =
+                partitionManager.getSpilledStatus().nextSetBit(i + 1)) {
             int spilledTupleCount = buildPSizeInTups[i];
             // Expected hash table size increase after reloading this partition
             long expectedHashTableByteSizeIncrease = 
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
@@ -435,7 +434,7 @@
         } finally {
             r.close();
         }
-        spilledStatus.set(pid, false);
+        partitionManager.getSpilledStatus().set(pid, false);
         buildRFWriters[pid] = null;
         return true;
     }
@@ -450,7 +449,7 @@
     private void loadDataInMemJoin() throws HyracksDataException {

         for (int pid = 0; pid < numOfPartitions; pid++) {
-            if (!spilledStatus.get(pid)) {
+            if (!partitionManager.getSpilledStatus().get(pid)) {
                 bufferManager.flushPartition(pid, new IFrameWriter() {
                     @Override
                     public void open() throws HyracksDataException {
@@ -496,7 +495,7 @@
             int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);

             if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has 
potential match from previous phase
-                if (spilledStatus.get(pid)) { //pid is Spilled
+                if (partitionManager.getSpilledStatus().get(pid)) { //pid is 
Spilled
                     while (!bufferManager.insertTuple(pid, accessorProbe, i, 
tempPtr)) {
                         int victim = pid;
                         if (bufferManager.getNumTuples(pid) == 0) { // current 
pid is empty, choose the biggest one
@@ -531,7 +530,7 @@
     }

     private boolean isBuildRelAllInMemory() {
-        return spilledStatus.nextSetBit(0) < 0;
+        return partitionManager.getSpilledStatus().nextSetBit(0) < 0;
     }

     public void completeProbe(IFrameWriter writer) throws HyracksDataException 
{
@@ -597,10 +596,11 @@
     }

     public BitSet getPartitionStatus() {
-        return spilledStatus;
+        return partitionManager.getSpilledStatus();
     }

     public void setIsReversed(boolean b) {
         this.isReversed = b;
     }
+
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/3422
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7a5609f6709efa427769da33c3a3b93233984fd8
Gerrit-Change-Number: 3422
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <[email protected]>

Reply via email to