Taewoo Kim has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1056

Change subject: ASTERIXDB-1566: Fixed External Hash Group By to conform to the 
memory budget
......................................................................

ASTERIXDB-1566: Fixed External Hash Group By to conform to the memory budget

 - External Hash Group By now conforms to the memory budget 
(compiler.groupmemory)
 - This is a temporary fix and another patchset will be submitted soon after we 
get the final design.

Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
8 files changed, 255 insertions(+), 133 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/56/1056/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index cd088c1..109d49d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.math.BigInteger;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.List;
@@ -248,6 +249,27 @@
         
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
         
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
 
+        // To set the number of hash values (slots) in external group by hash 
table,
+        // we try to get a prime number that is ranged between 80% * 
ceil(groupFrameLimit * framesize / 8) and
+        // 90% * ceil(groupFrameLimit * framesize / 8).
+        // Here, we assume that each hash entry pointer occupies 8 bytes 
(frame, offset).
+        // So, the number of hash entry pointer in a frame is ceil(framesize / 
8).
+        // The reason why we set the limit as 90% is that if it occupies 100% 
capacity,
+        // then there is no space for saving actual tuples.
+        int hashEntryMinSize = (int) Math.ceil(0.8 * groupFrameLimit * 
frameSize / 8);
+        int hashEntryMaxSize = (int) Math.ceil(0.9 * groupFrameLimit * 
frameSize / 8);
+        BigInteger tableSizePrimeNumber = 
BigInteger.valueOf(hashEntryMinSize).nextProbablePrime();
+        double capacityOccupationRatio = 0.8;
+        // If this number is bigger than 90%, then we try to find a prime 
number that fits within the budget.
+        while (tableSizePrimeNumber.longValue() > hashEntryMaxSize) {
+            // Try to reduce the ratio by 2% and try to get a prime number 
again.
+            capacityOccupationRatio -= 0.02;
+            hashEntryMinSize = (int) Math.ceil(capacityOccupationRatio * 
groupFrameLimit * frameSize / 8);
+            tableSizePrimeNumber = 
BigInteger.valueOf(hashEntryMinSize).nextProbablePrime();
+        }
+        OptimizationConfUtil.getPhysicalOptimizationConfig()
+                .setExternalGroupByTableSize(tableSizePrimeNumber.intValue());
+
         HeuristicCompilerFactoryBuilder builder =
                 new 
HeuristicCompilerFactoryBuilder(AqlOptimizationContextFactory.INSTANCE);
         
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index a0a9ab0..db9d7e5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -27,6 +27,9 @@
 
 public interface IPartitionedTupleBufferManager {
 
+    // Return the number of entire frames that are allocated to this buffer 
manager.
+    int getFrameCount();
+
     int getNumPartitions();
 
     int getNumTuples(int partition);
@@ -37,14 +40,16 @@
      * Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start, 
int size) into
      * specified partition. The handle is written into the tuplepointer.
      * <br>
-     * If {@code byteArray} contains the {@code fieldEndOffsets} already, then 
please set the {@code fieldEndOffsets} as NULL
+     * If {@code byteArray} contains the {@code fieldEndOffsets} already,
+     * then please set the {@code fieldEndOffsets} as NULL.
      *
      * @param partition
      *            the id of the partition to insert the tuple into
      * @param byteArray
      *            the byteArray which contains the tuple
      * @param fieldEndOffsets
-     *            the fieldEndOffsets which comes from the ArrayTupleBuilder, 
please set it to NULL if the {@code byteArray} already contains the 
fieldEndOffsets
+     *            the fieldEndOffsets which comes from the ArrayTupleBuilder, 
please set it to NULL
+     *            if the {@code byteArray} already contains the fieldEndOffsets
      * @param start
      *            the start offset in the {@code byteArray}
      * @param size
@@ -59,7 +64,7 @@
     /**
      * Insert tuple {@code tupleId} from the {@code tupleAccessor} into the 
given partition.
      * The returned handle is written into the tuplepointer
-     * 
+     *
      * @param partition
      *            the id of the partition to insert the tuple
      * @param tupleAccessor
@@ -67,7 +72,7 @@
      * @param tupleId
      *            the id of the tuple from the tupleAccessor
      * @param pointer
-     *            the returned pointer indicating the handler to later fetch 
the tuple from the buffer maanager
+     *            the returned pointer indicating the handler to later fetch 
the tuple from the buffer manager
      * @return true if the insertion succeed. Otherwise return false.
      * @throws HyracksDataException
      */
@@ -75,8 +80,9 @@
             throws HyracksDataException;
 
     /**
-     * Reset to the initial states. The previous allocated resources won't be 
released in order to be used in the next round.
-     * 
+     * Reset to the initial states. The previous allocated resources won't be 
released
+     * in order to be used in the next round.
+     *
      * @throws HyracksDataException
      */
     void reset() throws HyracksDataException;
@@ -93,7 +99,7 @@
      * This partition will not be cleared.
      * Currently it is used by Join where we flush the inner partition to the 
join (as a frameWriter),
      * but we will still keep the inner for the next outer partition.
-     * 
+     *
      * @param pid
      * @param writer
      * @throws HyracksDataException
@@ -102,7 +108,7 @@
 
     /**
      * Clear the memory occupation of the particular partition.
-     * 
+     *
      * @param partition
      * @throws HyracksDataException
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index f9387a9..6751296 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -40,14 +40,22 @@
     }
 
     public int selectVictimPartition(int failedToInsertPartition) {
-        // To avoid flush the half-full frame, it's better to spill itself.
+        // To avoid flush a half-full frame, it's better to spill itself.
         if (bufferManager.getNumTuples(failedToInsertPartition) > 0) {
             return failedToInsertPartition;
         }
+        // Try to find a victim partition from the already spilled partitions.
+        // The reason is that we want to keep in-memory partition (not spilled 
to the disk yet) as long as possible
+        // to reduce the overhead of writing to and reading from disk.
         int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage();
         int maxToSpillPartSize = 0;
-        // if we couldn't find the already spilled partition, or it is too 
small to flush that one,
+        // If we couldn't find an already spilled partition, or it is too 
small to flush that one,
         // try to flush an in memory partition.
+        //
+        // Note: right now, the 
createAtMostOneFrameForSpilledPartitionConstrain we are using for a spilled 
partition
+        // enforces that the number of maximum frame for a spilled partition 
is 1. So, the following first if statement
+        // is always true. i.e. we need to spill an in-memory partition anyway.
+        // But, when we will have another policy, the if statement will make 
more sense.
         if (partitionToSpill < 0
                 || (maxToSpillPartSize = 
bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) {
             int partitionInMem = findInMemPartitionWithMaxMemoryUsage();
@@ -81,7 +89,7 @@
 
     /**
      * Create an constrain for the already spilled partition that it can only 
use at most one frame.
-     * 
+     *
      * @param spillStatus
      * @return
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index c193c3b..c1c6672 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -47,6 +47,8 @@
     private final FixedSizeFrameTupleAppender appender;
     private BufferInfo tempInfo;
     private final IPartitionedMemoryConstrain constrain;
+    // total number of frames that are allocated
+    private int totalFrameCount = 0;
 
     public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, 
IPartitionedMemoryConstrain constrain,
             int partitions, int frameLimitInBytes) throws HyracksDataException 
{
@@ -71,6 +73,12 @@
         }
         Arrays.fill(numTuples, 0);
         appendFrame.reset(null);
+        totalFrameCount = 0;
+    }
+
+    @Override
+    public int getFrameCount() {
+        return totalFrameCount;
     }
 
     @Override
@@ -101,6 +109,7 @@
         if (partition != null) {
             for (int i = 0; i < partition.getNumFrames(); ++i) {
                 framePool.deAllocateBuffer(partition.getFrame(i, 
tempInfo).getBuffer());
+                totalFrameCount--;
             }
         }
         partitionArray[partitionId].reset();
@@ -163,6 +172,8 @@
         ByteBuffer newBuffer = requestNewBufferFromPool(size);
         if (newBuffer == null) {
             return -1;
+        } else {
+            totalFrameCount++;
         }
         appendFrame.reset(newBuffer);
         appender.reset(appendFrame, true);
@@ -274,4 +285,5 @@
 
     }
 
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f08d27d..f62360c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -140,7 +140,7 @@
             }
 
             @Override
-            public boolean insert(IFrameTupleAccessor accessor, int tIndex) 
throws HyracksDataException {
+            public InsertResultType insert(IFrameTupleAccessor accessor, int 
tIndex) throws HyracksDataException {
                 int entryInHashTable = tpc.partition(accessor, tIndex, 
tableSize);
                 for (int i = 0; i < 
hashTableForTuplePointer.getTupleCount(entryInHashTable); i++) {
                     hashTableForTuplePointer.getTuplePointer(entryInHashTable, 
i, pointer);
@@ -148,24 +148,30 @@
                     int c = ftpcInputCompareToAggregate.compare(accessor, 
tIndex, bufferAccessor);
                     if (c == 0) {
                         aggregateExistingTuple(accessor, tIndex, 
bufferAccessor, pointer.getTupleIndex());
-                        return true;
+                        return InsertResultType.SUCCESS;
                     }
                 }
 
                 return insertNewAggregateEntry(entryInHashTable, accessor, 
tIndex);
             }
 
-            private boolean insertNewAggregateEntry(int entryInHashTable, 
IFrameTupleAccessor accessor, int tIndex)
+            private InsertResultType insertNewAggregateEntry(int 
entryInHashTable, IFrameTupleAccessor accessor,
+                    int tIndex)
                     throws HyracksDataException {
                 initStateTupleBuilder(accessor, tIndex);
                 int pid = getPartition(entryInHashTable);
 
                 if (!bufferManager.insertTuple(pid, 
stateTupleBuilder.getByteArray(),
                         stateTupleBuilder.getFieldEndOffsets(), 0, 
stateTupleBuilder.getSize(), pointer)) {
-                    return false;
+                    return InsertResultType.FAIL;
                 }
                 hashTableForTuplePointer.insert(entryInHashTable, pointer);
-                return true;
+                // If the number of frames allocated to the data table and 
hash table exceeds the frame limit,
+                // we need to spill a partition to the disk to make a space.
+                if (bufferManager.getFrameCount() + 
hashTableForTuplePointer.getFrameCount() >= framesLimit) {
+                    return InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET;
+                }
+                return InsertResultType.SUCCESS;
             }
 
             private void initStateTupleBuilder(IFrameTupleAccessor accessor, 
int tIndex) throws HyracksDataException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
index 2b9ad54..970ea4a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
@@ -21,9 +21,19 @@
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public interface ISpillableTable {
+
+    /**
+     * Result Type for an insertion.
+     */
+    public static enum InsertResultType {
+        SUCCESS,
+        FAIL,
+        // If a memory budget is given and if an insertion is successful,
+        // but exceeds the given budget, we return this code.
+        SUCCESS_BUT_EXCEEDS_BUDGET
+    }
 
     /**
      * Release all the storage resources.
@@ -45,7 +55,7 @@
      * @return
      * @throws HyracksDataException
      */
-    boolean insert(IFrameTupleAccessor accessor, int tIndex) throws 
HyracksDataException;
+    InsertResultType insert(IFrameTupleAccessor accessor, int tIndex) throws 
HyracksDataException;
 
     /**
      * Flush the certain partition to writer, and return the numOfTuples that 
have been flushed
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
index e0ef2b3..ffd642a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.group.AggregateType;
 import org.apache.hyracks.dataflow.std.group.ISpillableTable;
+import org.apache.hyracks.dataflow.std.group.ISpillableTable.InsertResultType;
 
 public class ExternalHashGroupBy {
 
@@ -50,7 +51,8 @@
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
-            if (!table.insert(accessor, i)) {
+            InsertResultType result = table.insert(accessor, i);
+            if (result == InsertResultType.FAIL || result == 
InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET) {
                 do {
                     int partition = table.findVictimPartition(accessor, i);
                     if (partition < 0) {
@@ -58,7 +60,13 @@
                     }
                     RunFileWriter writer = 
getPartitionWriterOrCreateOneIfNotExist(partition);
                     flushPartitionToRun(partition, writer);
-                } while (!table.insert(accessor, i));
+                    if (result == InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET) 
{
+                        // If the result type is SUCCESS_BUT_EXCEEDS_BUDGET,
+                        // then we don't need to re-insert this tuple.
+                        break;
+                    }
+                    result = table.insert(accessor, i);
+                } while (result != InsertResultType.SUCCESS);
             }
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index b42cdb7..ca43e1e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -25,22 +25,33 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
+ * This table consists of header frames and content frames.
+ * .
+ * Header indicates the first entry slot location for the given integer value.
+ * A header slot consists of [content frame number], [offset in that frame] to 
get
+ * the first tuple's pointer information that shares the same value.
+ * .
+ * An entry slot in the content frame is as follows.
+ * [capacity of the slot], [# of occupied elements], {[frameIndex], 
[tupleIndex]}+;
  * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
  */
 public class SerializableHashTable implements ISerializableTable {
 
     private static final int INT_SIZE = 4;
+    // Initial entry slot size
     private static final int INIT_ENTRY_SIZE = 4;
 
+    // Header frame array
     private IntSerDeBuffer[] headers;
+    // Content frame list
     private List<IntSerDeBuffer> contents = new ArrayList<>();
-    private List<Integer> frameCurrentIndex = new ArrayList<>();
+    private List<Integer> currentOffsetInFrameList = new ArrayList<>();
     private final IHyracksFrameMgrContext ctx;
     private final int frameCapacity;
-    private int currentLargestFrameIndex = 0;
+    private int currentLargestFrameNumber = 0;
     private int tupleCount = 0;
-    private int headerFrameCount = 0;
+    // The number of total frames that are allocated to the headers and 
contents
+    private int totalFrameCount = 0;
     private TuplePointer tempTuplePointer = new TuplePointer();
 
     public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext 
ctx) throws HyracksDataException {
@@ -53,61 +64,65 @@
 
         IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
         contents.add(frame);
-        frameCurrentIndex.add(0);
+        totalFrameCount++;
+        currentOffsetInFrameList.add(0);
         frameCapacity = frame.capacity();
     }
 
     @Override
     public void insert(int entry, TuplePointer pointer) throws 
HyracksDataException {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header == null) {
-            header = new IntSerDeBuffer(ctx.allocateFrame().array());
-            headers[hFrameIndex] = header;
-            resetFrame(header);
-            headerFrameCount++;
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+        if (headerFrame == null) {
+            headerFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
+            headers[headerFrameIndex] = headerFrame;
+            resetFrame(headerFrame);
+            totalFrameCount++;
         }
-        int frameIndex = header.getInt(headerOffset);
-        int offsetIndex = header.getInt(headerOffset + 1);
-        if (frameIndex < 0) {
-            // insert first tuple into the entry
-            insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
+        int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+        if (contentFrameIndex < 0) {
+            // Since the initial value of index and offset is -1, this means 
that the entry slot for
+            // this entry is not created yet. So, create the entry slot and 
insert first tuple into that slot.
+            insertNewEntry(headerFrame, offsetInHeaderFrame, INIT_ENTRY_SIZE, 
pointer);
         } else {
-            // insert non-first tuple into the entry
-            insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, 
pointer);
+            // The entry slot already exists. Insert non-first tuple into the 
entry slot
+            int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame 
+ 1);
+            insertNonFirstTuple(headerFrame, offsetInHeaderFrame, 
contentFrameIndex, offsetInContentFrame, pointer);
         }
         tupleCount++;
     }
 
     @Override
+    // Reset the slot information for the entry. Specifically, we reset the 
number of used count in the slot as 0.
     public void delete(int entry) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[headerFrameIndex];
         if (header != null) {
-            int frameIndex = header.getInt(headerOffset);
-            int offsetIndex = header.getInt(headerOffset + 1);
-            if (frameIndex >= 0) {
-                IntSerDeBuffer frame = contents.get(frameIndex);
-                int entryUsedItems = frame.getInt(offsetIndex + 1);
-                frame.writeInt(offsetIndex + 1, 0);
-                tupleCount -= entryUsedItems;
+            int contentFrameIndex = header.getInt(offsetInHeaderFrame);
+            int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1);
+            if (contentFrameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(contentFrameIndex);
+                int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 
1);
+                // Set used count as 0 in the slot.
+                frame.writeInt(offsetInContentFrame + 1, 0);
+                tupleCount -= entryUsedCountInSlot;
             }
         }
     }
 
     @Override
     public boolean getTuplePointer(int entry, int offset, TuplePointer 
dataPointer) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[headerFrameIndex];
         if (header == null) {
             dataPointer.reset(-1, -1);
             return false;
         }
-        int frameIndex = header.getInt(headerOffset);
-        int offsetIndex = header.getInt(headerOffset + 1);
+        int frameIndex = header.getInt(offsetInHeaderFrame);
+        int offsetIndex = header.getInt(offsetInHeaderFrame + 1);
         if (frameIndex < 0) {
             dataPointer.reset(-1, -1);
             return false;
@@ -134,18 +149,18 @@
             if (frame != null)
                 resetFrame(frame);
 
-        frameCurrentIndex.clear();
+        currentOffsetInFrameList.clear();
         for (int i = 0; i < contents.size(); i++) {
-            frameCurrentIndex.add(0);
+            currentOffsetInFrameList.add(0);
         }
 
-        currentLargestFrameIndex = 0;
+        currentLargestFrameNumber = 0;
         tupleCount = 0;
     }
 
     @Override
     public int getFrameCount() {
-        return headerFrameCount + contents.size();
+        return totalFrameCount;
     }
 
     @Override
@@ -154,17 +169,20 @@
     }
 
     @Override
+    /**
+     * Returns the tuple count in the slot for the given entry.
+     */
     public int getTupleCount(int entry) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header != null) {
-            int frameIndex = header.getInt(headerOffset);
-            int offsetIndex = header.getInt(headerOffset + 1);
-            if (frameIndex >= 0) {
-                IntSerDeBuffer frame = contents.get(frameIndex);
-                int entryUsedItems = frame.getInt(offsetIndex + 1);
-                return entryUsedItems;
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+        if (headerFrame != null) {
+            int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+            int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame 
+ 1);
+            if (contentFrameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(contentFrameIndex);
+                int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 
1);
+                return entryUsedCountInSlot;
             }
         }
         return 0;
@@ -176,102 +194,134 @@
         for (int i = 0; i < headers.length; i++)
             headers[i] = null;
         contents.clear();
-        frameCurrentIndex.clear();
+        currentOffsetInFrameList.clear();
         tupleCount = 0;
-        currentLargestFrameIndex = 0;
+        totalFrameCount = 0;
+        currentLargestFrameNumber = 0;
         ctx.deallocateFrames(nFrames);
     }
 
-    private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int 
entryCapacity, TuplePointer pointer)
+    private void insertNewEntry(IntSerDeBuffer header, int 
offsetInHeaderFrame, int entryCapacity, TuplePointer pointer)
             throws HyracksDataException {
-        IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
-        int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
+        IntSerDeBuffer lastContentFrame = 
contents.get(currentLargestFrameNumber);
+        int lastOffsetInCurrentFrame = 
currentOffsetInFrameList.get(currentLargestFrameNumber);
         int requiredIntCapacity = entryCapacity * 2;
-        int startFrameIndex = currentLargestFrameIndex;
+        int currentFrameNumber = currentLargestFrameNumber;
 
-        if (lastIndex + requiredIntCapacity >= frameCapacity) {
-            IntSerDeBuffer newFrame;
-            startFrameIndex++;
+        if (lastOffsetInCurrentFrame + requiredIntCapacity >= frameCapacity) {
+            IntSerDeBuffer newContentFrame;
+            currentFrameNumber++;
             do {
-                if (currentLargestFrameIndex >= contents.size() - 1) {
-                    newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
-                    currentLargestFrameIndex++;
-                    contents.add(newFrame);
-                    frameCurrentIndex.add(0);
+                if (currentLargestFrameNumber >= contents.size() - 1) {
+                    newContentFrame = new 
IntSerDeBuffer(ctx.allocateFrame().array());
+                    currentLargestFrameNumber++;
+                    contents.add(newContentFrame);
+                    totalFrameCount++;
+                    currentOffsetInFrameList.add(0);
                 } else {
-                    currentLargestFrameIndex++;
-                    frameCurrentIndex.set(currentLargestFrameIndex, 0);
+                    currentLargestFrameNumber++;
+                    currentOffsetInFrameList.set(currentLargestFrameNumber, 0);
                 }
                 requiredIntCapacity -= frameCapacity;
             } while (requiredIntCapacity > 0);
-            lastIndex = 0;
-            lastFrame = contents.get(startFrameIndex);
+            lastOffsetInCurrentFrame = 0;
+            lastContentFrame = contents.get(currentFrameNumber);
         }
 
         // set header
-        header.writeInt(headerOffset, startFrameIndex);
-        header.writeInt(headerOffset + 1, lastIndex);
+        header.writeInt(offsetInHeaderFrame, currentFrameNumber);
+        header.writeInt(offsetInHeaderFrame + 1, lastOffsetInCurrentFrame);
 
-        // set the entry
-        lastFrame.writeInt(lastIndex, entryCapacity - 1);
-        lastFrame.writeInt(lastIndex + 1, 1);
-        lastFrame.writeInt(lastIndex + 2, pointer.getFrameIndex());
-        lastFrame.writeInt(lastIndex + 3, pointer.getTupleIndex());
-        int newLastIndex = lastIndex + entryCapacity * 2;
-        newLastIndex = newLastIndex < frameCapacity ? newLastIndex : 
frameCapacity - 1;
-        frameCurrentIndex.set(startFrameIndex, newLastIndex);
+        // set the entry & its slot.
+        // 1. slot capacity
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame, entryCapacity - 1);
+        // 2. used count in the slot
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 1, 1);
+        // 3. initial entry in the slot
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 2, 
pointer.getFrameIndex());
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 3, 
pointer.getTupleIndex());
+        int newLastOffsetInContentFrame = lastOffsetInCurrentFrame + 
entryCapacity * 2;
+        newLastOffsetInContentFrame = newLastOffsetInContentFrame < 
frameCapacity ? newLastOffsetInContentFrame
+                : frameCapacity - 1;
+        currentOffsetInFrameList.set(currentFrameNumber, 
newLastOffsetInContentFrame);
 
-        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
+        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - 
lastOffsetInCurrentFrame);
         while (requiredIntCapacity > 0) {
-            startFrameIndex++;
+            currentFrameNumber++;
             requiredIntCapacity -= frameCapacity;
-            newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + 
frameCapacity : frameCapacity - 1;
-            frameCurrentIndex.set(startFrameIndex, newLastIndex);
+            newLastOffsetInContentFrame = requiredIntCapacity < 0 ? 
requiredIntCapacity + frameCapacity
+                    : frameCapacity - 1;
+            currentOffsetInFrameList.set(currentFrameNumber, 
newLastOffsetInContentFrame);
         }
     }
 
-    private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, 
int frameIndexArg, int offsetIndex,
+    private void insertNonFirstTuple(IntSerDeBuffer header, int 
offsetInHeaderFrame, int contentFrameIndex,
+            int offsetInContentFrame,
             TuplePointer pointer) throws HyracksDataException {
-        int frameIndex = frameIndexArg;
-        IntSerDeBuffer frame = contents.get(frameIndex);
-        int entryItems = frame.getInt(offsetIndex);
-        int entryUsedItems = frame.getInt(offsetIndex + 1);
-
-        if (entryUsedItems < entryItems) {
-            frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
-            int startIndex = offsetIndex + 2 + entryUsedItems * 2;
-            while (startIndex >= frameCapacity) {
+        int frameIndex = contentFrameIndex;
+        IntSerDeBuffer contentFrame = contents.get(frameIndex);
+        int entrySlotCapacity = contentFrame.getInt(offsetInContentFrame);
+        int entryUsedCountInSlot = contentFrame.getInt(offsetInContentFrame + 
1);
+        boolean frameIndexChanged = false;
+        if (entryUsedCountInSlot < entrySlotCapacity) {
+            // The slot has at least one space to accommodate this tuple 
pointer.
+            // Increase used count.
+            contentFrame.writeInt(offsetInContentFrame + 1, 
entryUsedCountInSlot + 1);
+            // Calculate the first empty spot in the slot.
+            // +2: (capacity, # of used entry count)
+            // *2: each tuplePointer's occupation (frame index + offset in 
that frame)
+            int startOffsetInContentFrame = offsetInContentFrame + 2 + 
entryUsedCountInSlot * 2;
+            while (startOffsetInContentFrame >= frameCapacity) {
                 ++frameIndex;
-                startIndex -= frameCapacity;
+                startOffsetInContentFrame -= frameCapacity;
+                frameIndexChanged = true;
             }
-            frame = contents.get(frameIndex);
-            frame.writeInt(startIndex, pointer.getFrameIndex());
-            frame.writeInt(startIndex + 1, pointer.getTupleIndex());
+            // We don't have to read content frame again if the frame index 
has not been changed.
+            if (frameIndexChanged) {
+                contentFrame = contents.get(frameIndex);
+            }
+            contentFrame.writeInt(startOffsetInContentFrame, 
pointer.getFrameIndex());
+            contentFrame.writeInt(startOffsetInContentFrame + 1, 
pointer.getTupleIndex());
         } else {
-            int capacity = (entryItems + 1) * 2;
-            header.writeInt(headerOffset, -1);
-            header.writeInt(headerOffset + 1, -1);
-            int fIndex = frame.getInt(offsetIndex + 2);
-            int tIndex = frame.getInt(offsetIndex + 3);
-            tempTuplePointer.reset(fIndex, tIndex);
-            this.insertNewEntry(header, headerOffset, capacity, 
tempTuplePointer);
-            int newFrameIndex = header.getInt(headerOffset);
-            int newTupleIndex = header.getInt(headerOffset + 1);
+            // There is no enough space in this slot.We need to increase the 
slot size and
+            // migrate the current entries in it.
 
-            for (int i = 1; i < entryUsedItems; i++) {
-                int startIndex = offsetIndex + 2 + i * 2;
+            // New capacity: double the original capacity
+            int capacity = (entrySlotCapacity + 1) * 2;
+            // Temporarily set the header as -1 for the slot.
+            header.writeInt(offsetInHeaderFrame, -1);
+            header.writeInt(offsetInHeaderFrame + 1, -1);
+            // Get the location of the initial entry.
+            int fIndex = contentFrame.getInt(offsetInContentFrame + 2);
+            int tIndex = contentFrame.getInt(offsetInContentFrame + 3);
+            tempTuplePointer.reset(fIndex, tIndex);
+            // Create a new double-sized slot for the current entries and
+            // migrate the initial entry in the slot to the new slot.
+            this.insertNewEntry(header, offsetInHeaderFrame, capacity, 
tempTuplePointer);
+            int newFrameIndex = header.getInt(offsetInHeaderFrame);
+            int newTupleIndex = header.getInt(offsetInHeaderFrame + 1);
+            boolean firstIterInTheLoop = true;
+
+            // Migrate the existing entries (from 2nd to the last).
+            for (int i = 1; i < entryUsedCountInSlot; i++) {
+                int startOffsetInContentFrame = offsetInContentFrame + 2 + i * 
2;
                 int startFrameIndex = frameIndex;
-                while (startIndex >= frameCapacity) {
+                while (startOffsetInContentFrame >= frameCapacity) {
                     ++startFrameIndex;
-                    startIndex -= frameCapacity;
+                    startOffsetInContentFrame -= frameCapacity;
                 }
-                frame = contents.get(startFrameIndex);
-                fIndex = frame.getInt(startIndex);
-                tIndex = frame.getInt(startIndex + 1);
+                if (firstIterInTheLoop || startFrameIndex != frameIndex) {
+                    // Only read content frame in case frameINdex is changed 
or if this is the first iteration.
+                    contentFrame = contents.get(startFrameIndex);
+                    firstIterInTheLoop = false;
+                }
+                fIndex = contentFrame.getInt(startOffsetInContentFrame);
+                tIndex = contentFrame.getInt(startOffsetInContentFrame + 1);
                 tempTuplePointer.reset(fIndex, tIndex);
-                insertNonFirstTuple(header, headerOffset, newFrameIndex, 
newTupleIndex, tempTuplePointer);
+                insertNonFirstTuple(header, offsetInHeaderFrame, 
newFrameIndex, newTupleIndex, tempTuplePointer);
             }
-            insertNonFirstTuple(header, headerOffset, newFrameIndex, 
newTupleIndex, pointer);
+            // Now, insert the new entry that caused an overflow to the old 
bucket.
+            insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, 
newTupleIndex, pointer);
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1056
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <wangs...@yahoo.com>

Reply via email to