Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1993

Change subject: [WIP] Introduce LinearProbeHashTable for InMemoryHashJoin
......................................................................

[WIP] Introduce LinearProbeHashTable for InMemoryHashJoin

WIP

Change-Id: I61e11d8223902cc34e81738660c1b3ed0ab86318
---
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
10 files changed, 398 insertions(+), 93 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/93/1993/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d9e6180..efef389 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -58,6 +58,7 @@
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
 import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
@@ -308,8 +309,8 @@
                     ITuplePartitionComputer hpc1 = new 
FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
                     int tableSize = (int) (state.memoryForHashtable * 
recordsPerFrame * factor);
-                    ISerializableTable table = new 
SimpleSerializableHashTable(tableSize, ctx);
-                    state.joiner = new InMemoryHashJoin(ctx, tableSize, new 
FrameTupleAccessor(rd0), hpc0,
+                    ISerializableTable table = new 
LinearProbeHashTable(tableSize, ctx);
+                    state.joiner = new InMemoryHashJoin(ctx, new 
FrameTupleAccessor(rd0), hpc0,
                             new FrameTupleAccessor(rd1), rd1, hpc1,
                             new FrameTuplePairComparator(keys0, keys1, 
comparators), isLeftOuter, nullWriters1, table,
                             predEvaluator, null);
@@ -497,7 +498,7 @@
                             } else {
                                 tableSize = (int) (memsize * recordsPerFrame * 
factor);
                             }
-                            ISerializableTable table = new 
SimpleSerializableHashTable(tableSize, ctx);
+                            ISerializableTable table = new 
LinearProbeHashTable(tableSize, ctx);
                             for (int partitionid = 0; partitionid < 
state.nPartitions; partitionid++) {
                                 RunFileWriter buildWriter = 
buildWriters[partitionid];
                                 RunFileWriter probeWriter = 
probeWriters[partitionid];
@@ -505,8 +506,8 @@
                                     continue;
                                 }
                                 table.reset();
-                                InMemoryHashJoin joiner = new 
InMemoryHashJoin(ctx, tableSize,
-                                        new FrameTupleAccessor(rd0), hpcRep0, 
new FrameTupleAccessor(rd1), rd1, hpcRep1,
+                                InMemoryHashJoin joiner = new 
InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0),
+                                        hpcRep0, new FrameTupleAccessor(rd1), 
rd1, hpcRep1,
                                         new FrameTuplePairComparator(keys0, 
keys1, comparators), isLeftOuter,
                                         nullWriters1, table, predEvaluator, 
null);
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index ec1c3a9..b5248a4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -56,7 +56,6 @@
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuild;
     private final ISerializableTable table;
-    private final int tableSize;
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of 
tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
@@ -67,23 +66,20 @@
 
     private static final Logger LOGGER = 
Logger.getLogger(InMemoryHashJoin.class.getName());
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, 
FrameTupleAccessor accessorProbe,
-            ITuplePartitionComputer tpcProbe, FrameTupleAccessor 
accessorBuild, RecordDescriptor rDBuild,
-            ITuplePartitionComputer tpcBuild, FrameTuplePairComparator 
comparator, boolean isLeftOuter,
-            IMissingWriter[] missingWritersBuild, ISerializableTable table, 
IPredicateEvaluator predEval,
-            ISimpleFrameBufferManager bufferManager)
+    public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor 
accessorProbe, ITuplePartitionComputer tpcProbe,
+            FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, 
ITuplePartitionComputer tpcBuild,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, 
IMissingWriter[] missingWritersBuild,
+            ISerializableTable table, IPredicateEvaluator predEval, 
ISimpleFrameBufferManager bufferManager)
             throws HyracksDataException {
-        this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, rDBuild, 
tpcBuild, comparator, isLeftOuter,
+        this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, 
comparator, isLeftOuter,
                 missingWritersBuild, table, predEval, false, bufferManager);
     }
 
-    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, 
FrameTupleAccessor accessorProbe,
-            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild,
-            RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild, 
FrameTuplePairComparator comparator,
-            boolean isLeftOuter, IMissingWriter[] missingWritersBuild, 
ISerializableTable table,
-            IPredicateEvaluator predEval, boolean reverse, 
ISimpleFrameBufferManager bufferManager)
-            throws HyracksDataException {
-        this.tableSize = tableSize;
+    public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor 
accessorProbe, ITuplePartitionComputer tpcProbe,
+            FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, 
ITuplePartitionComputer tpcBuild,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, 
IMissingWriter[] missingWritersBuild,
+            ISerializableTable table, IPredicateEvaluator predEval, boolean 
reverse,
+            ISimpleFrameBufferManager bufferManager) throws 
HyracksDataException {
         this.table = table;
         storedTuplePointer = new TuplePointer();
         buffers = new ArrayList<>();
@@ -109,12 +105,12 @@
         reverseOutputOrder = reverse;
         this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
         this.bufferManager = bufferManager;
-        if (tableSize != 0) {
+        if (table.getTableSize() != 0) {
             isTableCapacityNotZero = true;
         } else {
             isTableCapacityNotZero = false;
         }
-        LOGGER.fine("InMemoryHashJoin has been created for a table size of " + 
tableSize + " for Thread ID "
+        LOGGER.fine("InMemoryHashJoin has been created for a table size of " + 
table.getTableSize() + " for Thread ID "
                 + Thread.currentThread().getId() + ".");
     }
 
@@ -124,7 +120,7 @@
         accessorBuild.reset(buffer);
         int tCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tCount; ++i) {
-            int entry = tpcBuild.partition(accessorBuild, i, tableSize);
+            int entry = tpcBuild.partition(accessorBuild, i, 
table.getTableSize());
             storedTuplePointer.reset(bIndex, i);
             // If an insertion fails, then tries to insert the same tuple 
pointer again after compacting the table.
             if (!table.insert(entry, storedTuplePointer)) {
@@ -160,7 +156,7 @@
     void join(int tid, IFrameWriter writer) throws HyracksDataException {
         boolean matchFound = false;
         if (isTableCapacityNotZero) {
-            int entry = tpcProbe.partition(accessorProbe, tid, tableSize);
+            int entry = tpcProbe.partition(accessorProbe, tid, 
table.getTableSize());
             int tupleCount = table.getTupleCount(entry);
             for (int i = 0; i < tupleCount; i++) {
                 table.getTuplePointer(entry, i, storedTuplePointer);
@@ -175,6 +171,7 @@
                         appendToResult(tid, tIndex, writer);
                     }
                 }
+                storedTuplePointer.reset(-1, -1);
             }
         }
         if (!matchFound && isLeftOuter) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 702dae6..081522a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -54,6 +54,7 @@
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
@@ -202,8 +203,8 @@
                             .createPartitioner();
                     state = new 
HashBuildTaskState(ctx.getJobletContext().getJobId(),
                             new TaskId(getActivityId(), partition));
-                    ISerializableTable table = new 
SerializableHashTable(tableSize, ctx, bufferManager);
-                    state.joiner = new InMemoryHashJoin(ctx, tableSize, new 
FrameTupleAccessor(rd0), hpc0,
+                    ISerializableTable table = new 
LinearProbeHashTable(tableSize, ctx);
+                    state.joiner = new InMemoryHashJoin(ctx, new 
FrameTupleAccessor(rd0), hpc0,
                             new FrameTupleAccessor(rd1), rd1, hpc1,
                             new FrameTuplePairComparator(keys0, keys1, 
comparators), isLeftOuter, nullWriters1, table,
                             predEvaluator, bufferManager);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 5de272a..2a8586a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -45,7 +45,7 @@
 import 
org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
 import 
org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
@@ -110,8 +110,7 @@
     private int[] probePSizeInTups;
 
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int 
memSizeInFrames, int numOfPartitions,
-            String probeRelName,
-            String buildRelName, int[] probeKeys, int[] buildKeys, 
IBinaryComparator[] comparators,
+            String probeRelName, String buildRelName, int[] probeKeys, int[] 
buildKeys, IBinaryComparator[] comparators,
             RecordDescriptor probeRd, RecordDescriptor buildRd, 
ITuplePartitionComputer probeHpc,
             ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, 
boolean isLeftOuter,
             IMissingWriterFactory[] nullWriterFactories1) {
@@ -301,8 +300,7 @@
 
         // Calculates the expected hash table size for the given number of 
tuples in main memory
         // and deducts it from the free space.
-        long hashTableByteSizeForInMemTuples = 
SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
-                frameSize);
+        long hashTableByteSizeForInMemTuples = 
LinearProbeHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
         freeSpace -= hashTableByteSizeForInMemTuples;
 
         // In the case where free space is less than zero after considering 
the hash table size,
@@ -317,8 +315,9 @@
             int pidToSpill = selectSinglePartitionToSpill(freeSpace, 
inMemTupCount, frameSize);
             if (pidToSpill >= 0) {
                 // There is a suitable one. We spill that partition to the 
disk.
-                long hashTableSizeDecrease = 
-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
-                        inMemTupCount, -buildPSizeInTups[pidToSpill], 
frameSize);
+                long hashTableSizeDecrease = -LinearProbeHashTable
+                        
.calculateByteSizeDeltaForTableSizeChange(inMemTupCount, 
-buildPSizeInTups[pidToSpill],
+                                frameSize);
                 freeSpace = freeSpace + 
bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
                 inMemTupCount -= buildPSizeInTups[pidToSpill];
                 spillPartition(pidToSpill);
@@ -340,7 +339,7 @@
                     // Since the number of tuples in memory has been decreased,
                     // the hash table size will be decreased, too.
                     // We put minus since the method returns a negative value 
to represent a newly reclaimed space.
-                    long expectedHashTableSizeDecrease = -SerializableHashTable
+                    long expectedHashTableSizeDecrease = -LinearProbeHashTable
                             
.calculateByteSizeDeltaForTableSizeChange(inMemTupCount, 
-numberOfTuplesToBeSpilled,
                                     frameSize);
                     freeSpace = freeSpace + spaceToBeReturned + 
expectedHashTableSizeDecrease;
@@ -356,8 +355,7 @@
         // If more partitions have been spilled to the disk, calculate the 
expected hash table size again
         // before bringing some partitions to main memory.
         if (moreSpilled) {
-            hashTableByteSizeForInMemTuples = 
SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
-                    frameSize);
+            hashTableByteSizeForInMemTuples = 
LinearProbeHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
         }
 
         // Brings back some partitions if there is enough free space.
@@ -366,7 +364,7 @@
             if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
                 break;
             }
-            long expectedHashTableByteSizeIncrease = SerializableHashTable
+            long expectedHashTableByteSizeIncrease = LinearProbeHashTable
                     .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, 
buildPSizeInTups[pid], frameSize);
             freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - 
expectedHashTableByteSizeIncrease;
             inMemTupCount += buildPSizeInTups[pid];
@@ -393,7 +391,7 @@
                 continue;
             }
             // We put minus since the method returns a negative value to 
represent a newly reclaimed space.
-            spaceAfterSpill = currentFreeSpace + 
bufferManager.getPhysicalSize(p) + (-SerializableHashTable
+            spaceAfterSpill = currentFreeSpace + 
bufferManager.getPhysicalSize(p) + (-LinearProbeHashTable
                     
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, 
-buildPSizeInTups[p], frameSize));
             if (spaceAfterSpill == 0) {
                 // Found the perfect one. Just returns this partition.
@@ -412,8 +410,9 @@
                 && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
             int spilledTupleCount = buildPSizeInTups[i];
             // Expected hash table size increase after reloading this partition
-            long expectedHashTableByteSizeIncrease = 
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
-                    inMemTupCount, spilledTupleCount, 
ctx.getInitialFrameSize());
+            long expectedHashTableByteSizeIncrease = LinearProbeHashTable
+                    .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, 
spilledTupleCount,
+                            ctx.getInitialFrameSize());
             if (freeSpace >= buildRFWriters[i].getFileSize() + 
expectedHashTableByteSizeIncrease) {
                 return i;
             }
@@ -451,8 +450,8 @@
     }
 
     private void createInMemoryJoiner(int inMemTupCount) throws 
HyracksDataException {
-        ISerializableTable table = new SerializableHashTable(inMemTupCount, 
ctx, bufferManagerForHashTable);
-        this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new 
FrameTupleAccessor(probeRd), probeHpc,
+        ISerializableTable table = new LinearProbeHashTable(inMemTupCount, 
ctx);
+        this.inMemJoiner = new InMemoryHashJoin(ctx, new 
FrameTupleAccessor(probeRd), probeHpc,
                 new FrameTupleAccessor(buildRd), buildRd, buildHpc,
                 new FrameTuplePairComparator(probeKeys, buildKeys, 
comparators), isLeftOuter, nonMatchWriters, table,
                 predEvaluator, isReversed, bufferManagerForHashTable);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c44c583..ef96b8e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -64,6 +64,7 @@
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
@@ -492,9 +493,9 @@
                     }
 
                     // Calculate the expected hash table size for the both 
side.
-                    long expectedHashTableSizeForBuildInFrame = 
SerializableHashTable
+                    long expectedHashTableSizeForBuildInFrame = 
LinearProbeHashTable
                             .getExpectedTableFrameCount(buildSizeInTuple, 
frameSize);
-                    long expectedHashTableSizeForProbeInFrame = 
SerializableHashTable
+                    long expectedHashTableSizeForProbeInFrame = 
LinearProbeHashTable
                             .getExpectedTableFrameCount(probeSizeInTuple, 
frameSize);
 
                     //Apply in-Mem HJ if possible
@@ -716,8 +717,8 @@
                             state.memForJoin * ctx.getInitialFrameSize());
                     ISimpleFrameBufferManager bufferManager = new 
FramePoolBackedFrameBufferManager(framePool);
 
-                    ISerializableTable table = new 
SerializableHashTable(tabSize, ctx, bufferManager);
-                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, 
tabSize, new FrameTupleAccessor(probeRDesc),
+                    ISerializableTable table = new 
LinearProbeHashTable(tabSize, ctx);
+                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new 
FrameTupleAccessor(probeRDesc),
                             hpcRepProbe, new FrameTupleAccessor(buildRDesc), 
buildRDesc, hpcRepBuild,
                             new FrameTuplePairComparator(pKeys, bKeys, 
comparators), isLeftOuter, nonMatchWriter, table,
                             predEvaluator, isReversed, bufferManager);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
index d0e0616..dea9c98 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -64,4 +64,6 @@
      * Prints out the internal information of this table.
      */
     String printInfo();
+
+    int getTableSize();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
new file mode 100644
index 0000000..a194631
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class IntSerDeBuffer {
+
+    protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF;
+    protected static final int INT_SIZE = 4;
+
+    ByteBuffer byteBuffer;
+    byte[] bytes;
+
+    public IntSerDeBuffer(ByteBuffer byteBuffer) {
+        this.byteBuffer = byteBuffer;
+        this.bytes = byteBuffer.array();
+        resetFrame();
+    }
+
+    public int getInt(int pos) {
+        int offset = pos * 4;
+        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 
16) + ((bytes[offset + 2] & 0xff) << 8)
+                + (bytes[offset + 3] & 0xff);
+    }
+
+    public void writeInt(int pos, int value) {
+        int offset = pos * 4;
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset] = (byte) (value);
+    }
+
+    public void writeInvalidVal(int intPos, int intRange) {
+        int offset = intPos * 4;
+        Arrays.fill(bytes, offset, offset + INT_SIZE * intRange, 
INVALID_BYTE_VALUE);
+    }
+
+    public int capacity() {
+        return bytes.length / 4;
+    }
+
+    public int getByteCapacity() {
+        return bytes.length;
+    }
+
+    public ByteBuffer getByteBuffer() {
+        return byteBuffer;
+    }
+
+    public void resetFrame() {
+        Arrays.fill(bytes, INVALID_BYTE_VALUE);
+    }
+
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
new file mode 100644
index 0000000..7ea461f
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+
+import java.nio.ByteBuffer;
+
+public class LinearProbeHashTable implements ISerializableTable {
+    private static final int INT_SIZE = 4;
+    private static final int ENTRY_SIZE = 8;
+    private static final int INVERSE_LOAD_FACTOR = 2;
+    private IHyracksFrameMgrContext ctx;
+    private int tableSize;
+    private int frameCnt;
+    private int frameSize;
+    private int frameCapacity;
+    private int currentByteSize;
+    private int tupleCount;
+
+    private IntSerDeBuffer[] frames;
+
+    public LinearProbeHashTable(int totalTupleCount, final 
IHyracksFrameMgrContext ctx) {
+        this.ctx = ctx;
+        this.tableSize = totalTupleCount * INVERSE_LOAD_FACTOR;
+        this.frameSize = ctx.getInitialFrameSize();
+        this.frameCapacity = frameSize / (ENTRY_SIZE); // Frame capacity in 
bucket
+        this.frameCnt = (int) Math.ceil(tableSize * 1.0 / frameCapacity);
+        this.frames = new IntSerDeBuffer[frameCnt];
+        this.currentByteSize = 0;
+    }
+
+    private ByteBuffer getFrame(int size) throws HyracksDataException {
+        currentByteSize += size;
+        return ctx.allocateFrame(size);
+    }
+
+    private int safeFrameRead(int frameIdx, int tupleOffset) throws 
HyracksDataException {
+        if (frames[frameIdx] == null) {
+            ByteBuffer newBufferFrame = getFrame(frameSize);
+            frames[frameIdx] = new IntSerDeBuffer(newBufferFrame);
+        }
+        int result = frames[frameIdx].getInt(tupleOffset * (ENTRY_SIZE / 
INT_SIZE));
+        return result;
+    }
+
+    private int entryToTupleOffset(int entry) {
+        return (entry % frameCapacity);
+    }
+
+    @Override
+    public boolean insert(int entry, TuplePointer tuplePointer) throws 
HyracksDataException {
+        int entryPtr = entry;
+        int visitedRecords = 0;
+        // insert is guaranteed to be good
+        while (safeFrameRead(entryPtr / frameCapacity, 
entryToTupleOffset(entryPtr)) >= 0
+                && visitedRecords < tableSize) {
+            visitedRecords++;
+            entryPtr = (entryPtr + 1) % tableSize;
+        }
+        if (visitedRecords >= tableSize) {
+            return false;
+        }
+        writeEntry(entryPtr / frameCapacity, entryToTupleOffset(entryPtr), 
tuplePointer);
+        return true;
+    }
+
+    private void writeEntry(int frameIndex, int tupleOffset, TuplePointer 
tuplePointer) throws HyracksDataException {
+        int entryOffset = tupleOffset * ENTRY_SIZE / INT_SIZE;
+        frames[frameIndex].writeInt(entryOffset, tuplePointer.getFrameIndex());
+        frames[frameIndex].writeInt(entryOffset + 1, 
tuplePointer.getTupleIndex());
+    }
+
+    @Override
+    public void delete(int entry) {
+        // no op
+    }
+
+    @Override
+    public boolean getTuplePointer(int entry, int tupleOffset, TuplePointer 
tuplePointer) {
+        int actualEntry = (entry + tupleOffset) % tableSize;
+        if (frames[actualEntry / frameCapacity] == null
+                || frames[actualEntry / 
frameCapacity].getInt(entryToTupleOffset(actualEntry) * ENTRY_SIZE / INT_SIZE)
+                < 0) {
+            return false;
+        }
+        int frameIndex =
+                frames[actualEntry / 
frameCapacity].getInt(entryToTupleOffset(actualEntry) * ENTRY_SIZE / INT_SIZE);
+        int tupleIndex =
+                frames[actualEntry / 
frameCapacity].getInt(entryToTupleOffset(actualEntry) * ENTRY_SIZE / INT_SIZE + 
1);
+        tuplePointer.reset(frameIndex, tupleIndex);
+        return true;
+    }
+
+    @Override
+    public int getCurrentByteSize() {
+        return currentByteSize;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public int getTupleCount(int entry) {
+        int result = 0;
+        int ptr = entry;
+        while (frames[ptr / frameCapacity] != null
+                && frames[ptr / frameCapacity].getInt(entryToTupleOffset(ptr) 
* ENTRY_SIZE / INT_SIZE) >= 0
+                && result < tableSize) {
+            result++;
+            ptr = (ptr + 1) % tableSize;
+        }
+        return result;
+    }
+
+    @Override
+    public void reset() {
+        for (IntSerDeBuffer frame : frames) {
+            if (frame != null) {
+                frame.resetFrame();
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        int framesToDeallocate = 0;
+        for (int iter1 = 0; iter1 < frames.length; iter1++) {
+            if (frames[iter1] != null) {
+                framesToDeallocate++;
+                frames[iter1] = null;
+            }
+        }
+        tupleCount = 0;
+        currentByteSize = 0;
+        ctx.deallocateFrames(framesToDeallocate);
+    }
+
+    @Override
+    public boolean isGarbageCollectionNeeded() {
+        return true;
+    }
+
+    @Override
+    public int collectGarbage(ITuplePointerAccessor bufferAccessor, 
ITuplePartitionComputer tpc)
+            throws HyracksDataException {
+        throw new HyracksDataException("Not supported");
+    }
+
+    @Override
+    public String printInfo() {
+        return "NA";
+    }
+
+    @Override
+    public int getTableSize() {
+        return tableSize;
+    }
+
+    public static long getExpectedTableFrameCount(long tupleCount, int 
frameSize) {
+        return (long) (Math.ceil((double) tupleCount * INVERSE_LOAD_FACTOR * 
ENTRY_SIZE / (double) frameSize));
+    }
+
+    public static long getExpectedTableByteSize(long tupleCount, int 
frameSize) {
+        return getExpectedTableFrameCount(tupleCount, frameSize) * frameSize;
+    }
+
+    public static long calculateFrameCountDeltaForTableSizeChange(long 
origTupleCount, long delta, int frameSize) {
+        long originalFrameCount = getExpectedTableFrameCount(origTupleCount, 
frameSize);
+        long newFrameCount = getExpectedTableFrameCount(origTupleCount + 
delta, frameSize);
+        return newFrameCount - originalFrameCount;
+    }
+
+    public static long calculateByteSizeDeltaForTableSizeChange(long 
origTupleCount, long delta, int frameSize) {
+        return calculateFrameCountDeltaForTableSizeChange(origTupleCount, 
delta, frameSize) * frameSize;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
index b1d1f27..9311df3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
@@ -20,7 +20,6 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
@@ -47,7 +46,6 @@
     // Initial entry slot size
     protected static final int INIT_ENTRY_SIZE = 4;
     protected static final int INVALID_VALUE = 0xFFFFFFFF;
-    protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF;
 
     // Header frame array
     protected IntSerDeBuffer[] headers;
@@ -486,57 +484,16 @@
         return -1;
     }
 
-    static class IntSerDeBuffer {
 
-        ByteBuffer byteBuffer;
-        byte[] bytes;
-
-        public IntSerDeBuffer(ByteBuffer byteBuffer) {
-            this.byteBuffer = byteBuffer;
-            this.bytes = byteBuffer.array();
-            resetFrame();
-        }
-
-        public int getInt(int pos) {
-            int offset = pos * 4;
-            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 
0xff) << 16)
-                    + ((bytes[offset + 2] & 0xff) << 8) + (bytes[offset + 3] & 
0xff);
-        }
-
-        public void writeInt(int pos, int value) {
-            int offset = pos * 4;
-            bytes[offset++] = (byte) (value >> 24);
-            bytes[offset++] = (byte) (value >> 16);
-            bytes[offset++] = (byte) (value >> 8);
-            bytes[offset] = (byte) (value);
-        }
-
-        public void writeInvalidVal(int intPos, int intRange) {
-            int offset = intPos * 4;
-            Arrays.fill(bytes, offset, offset + INT_SIZE * intRange, 
INVALID_BYTE_VALUE);
-        }
-
-        public int capacity() {
-            return bytes.length / 4;
-        }
-
-        public int getByteCapacity() {
-            return bytes.length;
-        }
-
-        public ByteBuffer getByteBuffer() {
-            return byteBuffer;
-        }
-
-        public void resetFrame() {
-            Arrays.fill(bytes, INVALID_BYTE_VALUE);
-        }
-
-    }
 
     @Override
     public String printInfo() {
         return null;
     }
 
+    @Override
+    public int getTableSize() {
+        return tableSize;
+    }
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
new file mode 100644
index 0000000..f10a677
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LinearProbeHashTableTest {
+
+    private IHyracksFrameMgrContext ctx;
+
+    @Before
+    public void setUp() {
+        ctx = new FrameManager(256);
+    }
+
+    @Test
+    public void testLinearProbeWithoutCollision() throws Exception {
+        int elementRange = 100;
+        LinearProbeHashTable hashTable = new 
LinearProbeHashTable(elementRange, ctx);
+        for (int iter1 = 0; iter1 < elementRange; iter1++) {
+            hashTable.insert(iter1, new TuplePointer(iter1, iter1));
+        }
+        TuplePointer readPtr = new TuplePointer(-1, -1);
+        for (int iter1 = 0; iter1 < elementRange; iter1++) {
+            hashTable.getTuplePointer(iter1, 0, readPtr);
+            Assert.assertEquals(iter1, readPtr.getFrameIndex());
+            Assert.assertEquals(iter1, readPtr.getTupleIndex());
+            readPtr.reset(-1, -1);
+        }
+    }
+
+    @Test
+    public void testLinearProbeWithCollision() throws Exception {
+        int elementRange = 100;
+        int repeat = 4;
+        LinearProbeHashTable hashTable = new LinearProbeHashTable(elementRange 
* repeat, ctx);
+        for (int iter1 = 0; iter1 < elementRange; iter1++) {
+            for (int iter2 = 0; iter2 < repeat; iter2++) {
+                hashTable.insert(iter1, new TuplePointer(iter1, iter1));
+            }
+        }
+        TuplePointer readPtr = new TuplePointer(-1, -1);
+        for (int iter1 = 0; iter1 < elementRange; iter1++) {
+            int bucketSize = hashTable.getTupleCount(iter1);
+            int foundCnt = 0;
+            for (int iter2 = 0; iter2 < bucketSize && foundCnt < repeat; 
iter2++) {
+                hashTable.getTuplePointer(iter1, iter2, readPtr);
+                if (readPtr.getTupleIndex() == iter1) {
+                    foundCnt++;
+                }
+            }
+            Assert.assertEquals(repeat, foundCnt);
+        }
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1993
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I61e11d8223902cc34e81738660c1b3ed0ab86318
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to