Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1993
Change subject: [WIP] Introduce LinearProbeHashTable for InMemoryHashJoin
......................................................................
[WIP] Introduce LinearProbeHashTable for InMemoryHashJoin
WIP
Change-Id: I61e11d8223902cc34e81738660c1b3ed0ab86318
---
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
10 files changed, 398 insertions(+), 93 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/93/1993/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d9e6180..efef389 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -58,6 +58,7 @@
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable;
import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -308,8 +309,8 @@
ITuplePartitionComputer hpc1 = new
FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
.createPartitioner();
int tableSize = (int) (state.memoryForHashtable *
recordsPerFrame * factor);
- ISerializableTable table = new
SimpleSerializableHashTable(tableSize, ctx);
- state.joiner = new InMemoryHashJoin(ctx, tableSize, new
FrameTupleAccessor(rd0), hpc0,
+ ISerializableTable table = new
LinearProbeHashTable(tableSize, ctx);
+ state.joiner = new InMemoryHashJoin(ctx, new
FrameTupleAccessor(rd0), hpc0,
new FrameTupleAccessor(rd1), rd1, hpc1,
new FrameTuplePairComparator(keys0, keys1,
comparators), isLeftOuter, nullWriters1, table,
predEvaluator, null);
@@ -497,7 +498,7 @@
} else {
tableSize = (int) (memsize * recordsPerFrame *
factor);
}
- ISerializableTable table = new
SimpleSerializableHashTable(tableSize, ctx);
+ ISerializableTable table = new
LinearProbeHashTable(tableSize, ctx);
for (int partitionid = 0; partitionid <
state.nPartitions; partitionid++) {
RunFileWriter buildWriter =
buildWriters[partitionid];
RunFileWriter probeWriter =
probeWriters[partitionid];
@@ -505,8 +506,8 @@
continue;
}
table.reset();
- InMemoryHashJoin joiner = new
InMemoryHashJoin(ctx, tableSize,
- new FrameTupleAccessor(rd0), hpcRep0,
new FrameTupleAccessor(rd1), rd1, hpcRep1,
+ InMemoryHashJoin joiner = new
InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0),
+ hpcRep0, new FrameTupleAccessor(rd1),
rd1, hpcRep1,
new FrameTuplePairComparator(keys0,
keys1, comparators), isLeftOuter,
nullWriters1, table, predEvaluator,
null);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index ec1c3a9..b5248a4 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -56,7 +56,6 @@
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuild;
private final ISerializableTable table;
- private final int tableSize;
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of
tuples, we are writing in output
private final IPredicateEvaluator predEvaluator;
@@ -67,23 +66,20 @@
private static final Logger LOGGER =
Logger.getLogger(InMemoryHashJoin.class.getName());
- public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize,
FrameTupleAccessor accessorProbe,
- ITuplePartitionComputer tpcProbe, FrameTupleAccessor
accessorBuild, RecordDescriptor rDBuild,
- ITuplePartitionComputer tpcBuild, FrameTuplePairComparator
comparator, boolean isLeftOuter,
- IMissingWriter[] missingWritersBuild, ISerializableTable table,
IPredicateEvaluator predEval,
- ISimpleFrameBufferManager bufferManager)
+ public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor
accessorProbe, ITuplePartitionComputer tpcProbe,
+ FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
ITuplePartitionComputer tpcBuild,
+ FrameTuplePairComparator comparator, boolean isLeftOuter,
IMissingWriter[] missingWritersBuild,
+ ISerializableTable table, IPredicateEvaluator predEval,
ISimpleFrameBufferManager bufferManager)
throws HyracksDataException {
- this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, rDBuild,
tpcBuild, comparator, isLeftOuter,
+ this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild,
comparator, isLeftOuter,
missingWritersBuild, table, predEval, false, bufferManager);
}
- public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize,
FrameTupleAccessor accessorProbe,
- ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild,
- RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild,
FrameTuplePairComparator comparator,
- boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
ISerializableTable table,
- IPredicateEvaluator predEval, boolean reverse,
ISimpleFrameBufferManager bufferManager)
- throws HyracksDataException {
- this.tableSize = tableSize;
+ public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor
accessorProbe, ITuplePartitionComputer tpcProbe,
+ FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
ITuplePartitionComputer tpcBuild,
+ FrameTuplePairComparator comparator, boolean isLeftOuter,
IMissingWriter[] missingWritersBuild,
+ ISerializableTable table, IPredicateEvaluator predEval, boolean
reverse,
+ ISimpleFrameBufferManager bufferManager) throws
HyracksDataException {
this.table = table;
storedTuplePointer = new TuplePointer();
buffers = new ArrayList<>();
@@ -109,12 +105,12 @@
reverseOutputOrder = reverse;
this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
this.bufferManager = bufferManager;
- if (tableSize != 0) {
+ if (table.getTableSize() != 0) {
isTableCapacityNotZero = true;
} else {
isTableCapacityNotZero = false;
}
- LOGGER.fine("InMemoryHashJoin has been created for a table size of " +
tableSize + " for Thread ID "
+ LOGGER.fine("InMemoryHashJoin has been created for a table size of " +
table.getTableSize() + " for Thread ID "
+ Thread.currentThread().getId() + ".");
}
@@ -124,7 +120,7 @@
accessorBuild.reset(buffer);
int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
- int entry = tpcBuild.partition(accessorBuild, i, tableSize);
+ int entry = tpcBuild.partition(accessorBuild, i,
table.getTableSize());
storedTuplePointer.reset(bIndex, i);
// If an insertion fails, then tries to insert the same tuple
pointer again after compacting the table.
if (!table.insert(entry, storedTuplePointer)) {
@@ -160,7 +156,7 @@
void join(int tid, IFrameWriter writer) throws HyracksDataException {
boolean matchFound = false;
if (isTableCapacityNotZero) {
- int entry = tpcProbe.partition(accessorProbe, tid, tableSize);
+ int entry = tpcProbe.partition(accessorProbe, tid,
table.getTableSize());
int tupleCount = table.getTupleCount(entry);
for (int i = 0; i < tupleCount; i++) {
table.getTuplePointer(entry, i, storedTuplePointer);
@@ -175,6 +171,7 @@
appendToResult(tid, tIndex, writer);
}
}
+ storedTuplePointer.reset(-1, -1);
}
}
if (!matchFound && isLeftOuter) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 702dae6..081522a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -54,6 +54,7 @@
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -202,8 +203,8 @@
.createPartitioner();
state = new
HashBuildTaskState(ctx.getJobletContext().getJobId(),
new TaskId(getActivityId(), partition));
- ISerializableTable table = new
SerializableHashTable(tableSize, ctx, bufferManager);
- state.joiner = new InMemoryHashJoin(ctx, tableSize, new
FrameTupleAccessor(rd0), hpc0,
+ ISerializableTable table = new
LinearProbeHashTable(tableSize, ctx);
+ state.joiner = new InMemoryHashJoin(ctx, new
FrameTupleAccessor(rd0), hpc0,
new FrameTupleAccessor(rd1), rd1, hpc1,
new FrameTuplePairComparator(keys0, keys1,
comparators), isLeftOuter, nullWriters1, table,
predEvaluator, bufferManager);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 5de272a..2a8586a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -45,7 +45,7 @@
import
org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
import
org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -110,8 +110,7 @@
private int[] probePSizeInTups;
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int
memSizeInFrames, int numOfPartitions,
- String probeRelName,
- String buildRelName, int[] probeKeys, int[] buildKeys,
IBinaryComparator[] comparators,
+ String probeRelName, String buildRelName, int[] probeKeys, int[]
buildKeys, IBinaryComparator[] comparators,
RecordDescriptor probeRd, RecordDescriptor buildRd,
ITuplePartitionComputer probeHpc,
ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
boolean isLeftOuter,
IMissingWriterFactory[] nullWriterFactories1) {
@@ -301,8 +300,7 @@
// Calculates the expected hash table size for the given number of
tuples in main memory
// and deducts it from the free space.
- long hashTableByteSizeForInMemTuples =
SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
- frameSize);
+ long hashTableByteSizeForInMemTuples =
LinearProbeHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
freeSpace -= hashTableByteSizeForInMemTuples;
// In the case where free space is less than zero after considering
the hash table size,
@@ -317,8 +315,9 @@
int pidToSpill = selectSinglePartitionToSpill(freeSpace,
inMemTupCount, frameSize);
if (pidToSpill >= 0) {
// There is a suitable one. We spill that partition to the
disk.
- long hashTableSizeDecrease =
-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
- inMemTupCount, -buildPSizeInTups[pidToSpill],
frameSize);
+ long hashTableSizeDecrease = -LinearProbeHashTable
+
.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
-buildPSizeInTups[pidToSpill],
+ frameSize);
freeSpace = freeSpace +
bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
inMemTupCount -= buildPSizeInTups[pidToSpill];
spillPartition(pidToSpill);
@@ -340,7 +339,7 @@
// Since the number of tuples in memory has been decreased,
// the hash table size will be decreased, too.
// We put minus since the method returns a negative value
to represent a newly reclaimed space.
- long expectedHashTableSizeDecrease = -SerializableHashTable
+ long expectedHashTableSizeDecrease = -LinearProbeHashTable
.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
-numberOfTuplesToBeSpilled,
frameSize);
freeSpace = freeSpace + spaceToBeReturned +
expectedHashTableSizeDecrease;
@@ -356,8 +355,7 @@
// If more partitions have been spilled to the disk, calculate the
expected hash table size again
// before bringing some partitions to main memory.
if (moreSpilled) {
- hashTableByteSizeForInMemTuples =
SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
- frameSize);
+ hashTableByteSizeForInMemTuples =
LinearProbeHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
}
// Brings back some partitions if there is enough free space.
@@ -366,7 +364,7 @@
if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
break;
}
- long expectedHashTableByteSizeIncrease = SerializableHashTable
+ long expectedHashTableByteSizeIncrease = LinearProbeHashTable
.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
buildPSizeInTups[pid], frameSize);
freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) -
expectedHashTableByteSizeIncrease;
inMemTupCount += buildPSizeInTups[pid];
@@ -393,7 +391,7 @@
continue;
}
// We put minus since the method returns a negative value to
represent a newly reclaimed space.
- spaceAfterSpill = currentFreeSpace +
bufferManager.getPhysicalSize(p) + (-SerializableHashTable
+ spaceAfterSpill = currentFreeSpace +
bufferManager.getPhysicalSize(p) + (-LinearProbeHashTable
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
-buildPSizeInTups[p], frameSize));
if (spaceAfterSpill == 0) {
// Found the perfect one. Just returns this partition.
@@ -412,8 +410,9 @@
&& i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
int spilledTupleCount = buildPSizeInTups[i];
// Expected hash table size increase after reloading this partition
- long expectedHashTableByteSizeIncrease =
SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
- inMemTupCount, spilledTupleCount,
ctx.getInitialFrameSize());
+ long expectedHashTableByteSizeIncrease = LinearProbeHashTable
+ .calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
spilledTupleCount,
+ ctx.getInitialFrameSize());
if (freeSpace >= buildRFWriters[i].getFileSize() +
expectedHashTableByteSizeIncrease) {
return i;
}
@@ -451,8 +450,8 @@
}
private void createInMemoryJoiner(int inMemTupCount) throws
HyracksDataException {
- ISerializableTable table = new SerializableHashTable(inMemTupCount,
ctx, bufferManagerForHashTable);
- this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new
FrameTupleAccessor(probeRd), probeHpc,
+ ISerializableTable table = new LinearProbeHashTable(inMemTupCount,
ctx);
+ this.inMemJoiner = new InMemoryHashJoin(ctx, new
FrameTupleAccessor(probeRd), probeHpc,
new FrameTupleAccessor(buildRd), buildRd, buildHpc,
new FrameTuplePairComparator(probeKeys, buildKeys,
comparators), isLeftOuter, nonMatchWriters, table,
predEvaluator, isReversed, bufferManagerForHashTable);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c44c583..ef96b8e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -64,6 +64,7 @@
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
+import org.apache.hyracks.dataflow.std.structures.LinearProbeHashTable;
import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -492,9 +493,9 @@
}
// Calculate the expected hash table size for the both
side.
- long expectedHashTableSizeForBuildInFrame =
SerializableHashTable
+ long expectedHashTableSizeForBuildInFrame =
LinearProbeHashTable
.getExpectedTableFrameCount(buildSizeInTuple,
frameSize);
- long expectedHashTableSizeForProbeInFrame =
SerializableHashTable
+ long expectedHashTableSizeForProbeInFrame =
LinearProbeHashTable
.getExpectedTableFrameCount(probeSizeInTuple,
frameSize);
//Apply in-Mem HJ if possible
@@ -716,8 +717,8 @@
state.memForJoin * ctx.getInitialFrameSize());
ISimpleFrameBufferManager bufferManager = new
FramePoolBackedFrameBufferManager(framePool);
- ISerializableTable table = new
SerializableHashTable(tabSize, ctx, bufferManager);
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx,
tabSize, new FrameTupleAccessor(probeRDesc),
+ ISerializableTable table = new
LinearProbeHashTable(tabSize, ctx);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new
FrameTupleAccessor(probeRDesc),
hpcRepProbe, new FrameTupleAccessor(buildRDesc),
buildRDesc, hpcRepBuild,
new FrameTuplePairComparator(pKeys, bKeys,
comparators), isLeftOuter, nonMatchWriter, table,
predEvaluator, isReversed, bufferManager);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
index d0e0616..dea9c98 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -64,4 +64,6 @@
* Prints out the internal information of this table.
*/
String printInfo();
+
+ int getTableSize();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
new file mode 100644
index 0000000..a194631
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/IntSerDeBuffer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class IntSerDeBuffer {
+
+ protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF;
+ protected static final int INT_SIZE = 4;
+
+ ByteBuffer byteBuffer;
+ byte[] bytes;
+
+ public IntSerDeBuffer(ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ this.bytes = byteBuffer.array();
+ resetFrame();
+ }
+
+ public int getInt(int pos) {
+ int offset = pos * 4;
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) <<
16) + ((bytes[offset + 2] & 0xff) << 8)
+ + (bytes[offset + 3] & 0xff);
+ }
+
+ public void writeInt(int pos, int value) {
+ int offset = pos * 4;
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset] = (byte) (value);
+ }
+
+ public void writeInvalidVal(int intPos, int intRange) {
+ int offset = intPos * 4;
+ Arrays.fill(bytes, offset, offset + INT_SIZE * intRange,
INVALID_BYTE_VALUE);
+ }
+
+ public int capacity() {
+ return bytes.length / 4;
+ }
+
+ public int getByteCapacity() {
+ return bytes.length;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+
+ public void resetFrame() {
+ Arrays.fill(bytes, INVALID_BYTE_VALUE);
+ }
+
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
new file mode 100644
index 0000000..7ea461f
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTable.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+
+import java.nio.ByteBuffer;
+
+public class LinearProbeHashTable implements ISerializableTable {
+ private static final int INT_SIZE = 4;
+ private static final int ENTRY_SIZE = 8;
+ private static final int INVERSE_LOAD_FACTOR = 2;
+ private IHyracksFrameMgrContext ctx;
+ private int tableSize;
+ private int frameCnt;
+ private int frameSize;
+ private int frameCapacity;
+ private int currentByteSize;
+ private int tupleCount;
+
+ private IntSerDeBuffer[] frames;
+
+ public LinearProbeHashTable(int totalTupleCount, final
IHyracksFrameMgrContext ctx) {
+ this.ctx = ctx;
+ this.tableSize = totalTupleCount * INVERSE_LOAD_FACTOR;
+ this.frameSize = ctx.getInitialFrameSize();
+ this.frameCapacity = frameSize / (ENTRY_SIZE); // Frame capacity in
bucket
+ this.frameCnt = (int) Math.ceil(tableSize * 1.0 / frameCapacity);
+ this.frames = new IntSerDeBuffer[frameCnt];
+ this.currentByteSize = 0;
+ }
+
+ private ByteBuffer getFrame(int size) throws HyracksDataException {
+ currentByteSize += size;
+ return ctx.allocateFrame(size);
+ }
+
+ private int safeFrameRead(int frameIdx, int tupleOffset) throws
HyracksDataException {
+ if (frames[frameIdx] == null) {
+ ByteBuffer newBufferFrame = getFrame(frameSize);
+ frames[frameIdx] = new IntSerDeBuffer(newBufferFrame);
+ }
+ int result = frames[frameIdx].getInt(tupleOffset * (ENTRY_SIZE /
INT_SIZE));
+ return result;
+ }
+
+ private int entryToTupleOffset(int entry) {
+ return (entry % frameCapacity);
+ }
+
+ @Override
+ public boolean insert(int entry, TuplePointer tuplePointer) throws
HyracksDataException {
+ int entryPtr = entry;
+ int visitedRecords = 0;
+ // insert is guaranteed to be good
+ while (safeFrameRead(entryPtr / frameCapacity,
entryToTupleOffset(entryPtr)) >= 0
+ && visitedRecords < tableSize) {
+ visitedRecords++;
+ entryPtr = (entryPtr + 1) % tableSize;
+ }
+ if (visitedRecords >= tableSize) {
+ return false;
+ }
+ writeEntry(entryPtr / frameCapacity, entryToTupleOffset(entryPtr),
tuplePointer);
+ return true;
+ }
+
+ private void writeEntry(int frameIndex, int tupleOffset, TuplePointer
tuplePointer) throws HyracksDataException {
+ int entryOffset = tupleOffset * ENTRY_SIZE / INT_SIZE;
+ frames[frameIndex].writeInt(entryOffset, tuplePointer.getFrameIndex());
+ frames[frameIndex].writeInt(entryOffset + 1,
tuplePointer.getTupleIndex());
+ }
+
+ @Override
+ public void delete(int entry) {
+ // no op
+ }
+
+ @Override
+ public boolean getTuplePointer(int entry, int tupleOffset, TuplePointer
tuplePointer) {
+ int actualEntry = (entry + tupleOffset) % tableSize;
+ if (frames[actualEntry / frameCapacity] == null
+ || frames[actualEntry /
frameCapacity].getInt(entryToTupleOffset(actualEntry) * ENTRY_SIZE / INT_SIZE)
+ < 0) {
+ return false;
+ }
+ int frameIndex =
+ frames[actualEntry /
frameCapacity].getInt(entryToTupleOffset(actualEntry) * ENTRY_SIZE / INT_SIZE);
+ int tupleIndex =
+ frames[actualEntry /
frameCapacity].getInt(entryToTupleOffset(actualEntry) * ENTRY_SIZE / INT_SIZE +
1);
+ tuplePointer.reset(frameIndex, tupleIndex);
+ return true;
+ }
+
+ @Override
+ public int getCurrentByteSize() {
+ return currentByteSize;
+ }
+
+ @Override
+ public int getTupleCount() {
+ return tupleCount;
+ }
+
+ @Override
+ public int getTupleCount(int entry) {
+ int result = 0;
+ int ptr = entry;
+ while (frames[ptr / frameCapacity] != null
+ && frames[ptr / frameCapacity].getInt(entryToTupleOffset(ptr)
* ENTRY_SIZE / INT_SIZE) >= 0
+ && result < tableSize) {
+ result++;
+ ptr = (ptr + 1) % tableSize;
+ }
+ return result;
+ }
+
+ @Override
+ public void reset() {
+ for (IntSerDeBuffer frame : frames) {
+ if (frame != null) {
+ frame.resetFrame();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ int framesToDeallocate = 0;
+ for (int iter1 = 0; iter1 < frames.length; iter1++) {
+ if (frames[iter1] != null) {
+ framesToDeallocate++;
+ frames[iter1] = null;
+ }
+ }
+ tupleCount = 0;
+ currentByteSize = 0;
+ ctx.deallocateFrames(framesToDeallocate);
+ }
+
+ @Override
+ public boolean isGarbageCollectionNeeded() {
+ return true;
+ }
+
+ @Override
+ public int collectGarbage(ITuplePointerAccessor bufferAccessor,
ITuplePartitionComputer tpc)
+ throws HyracksDataException {
+ throw new HyracksDataException("Not supported");
+ }
+
+ @Override
+ public String printInfo() {
+ return "NA";
+ }
+
+ @Override
+ public int getTableSize() {
+ return tableSize;
+ }
+
+ public static long getExpectedTableFrameCount(long tupleCount, int
frameSize) {
+ return (long) (Math.ceil((double) tupleCount * INVERSE_LOAD_FACTOR *
ENTRY_SIZE / (double) frameSize));
+ }
+
+ public static long getExpectedTableByteSize(long tupleCount, int
frameSize) {
+ return getExpectedTableFrameCount(tupleCount, frameSize) * frameSize;
+ }
+
+ public static long calculateFrameCountDeltaForTableSizeChange(long
origTupleCount, long delta, int frameSize) {
+ long originalFrameCount = getExpectedTableFrameCount(origTupleCount,
frameSize);
+ long newFrameCount = getExpectedTableFrameCount(origTupleCount +
delta, frameSize);
+ return newFrameCount - originalFrameCount;
+ }
+
+ public static long calculateByteSizeDeltaForTableSizeChange(long
origTupleCount, long delta, int frameSize) {
+ return calculateFrameCountDeltaForTableSizeChange(origTupleCount,
delta, frameSize) * frameSize;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
index b1d1f27..9311df3 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
@@ -47,7 +46,6 @@
// Initial entry slot size
protected static final int INIT_ENTRY_SIZE = 4;
protected static final int INVALID_VALUE = 0xFFFFFFFF;
- protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF;
// Header frame array
protected IntSerDeBuffer[] headers;
@@ -486,57 +484,16 @@
return -1;
}
- static class IntSerDeBuffer {
- ByteBuffer byteBuffer;
- byte[] bytes;
-
- public IntSerDeBuffer(ByteBuffer byteBuffer) {
- this.byteBuffer = byteBuffer;
- this.bytes = byteBuffer.array();
- resetFrame();
- }
-
- public int getInt(int pos) {
- int offset = pos * 4;
- return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] &
0xff) << 16)
- + ((bytes[offset + 2] & 0xff) << 8) + (bytes[offset + 3] &
0xff);
- }
-
- public void writeInt(int pos, int value) {
- int offset = pos * 4;
- bytes[offset++] = (byte) (value >> 24);
- bytes[offset++] = (byte) (value >> 16);
- bytes[offset++] = (byte) (value >> 8);
- bytes[offset] = (byte) (value);
- }
-
- public void writeInvalidVal(int intPos, int intRange) {
- int offset = intPos * 4;
- Arrays.fill(bytes, offset, offset + INT_SIZE * intRange,
INVALID_BYTE_VALUE);
- }
-
- public int capacity() {
- return bytes.length / 4;
- }
-
- public int getByteCapacity() {
- return bytes.length;
- }
-
- public ByteBuffer getByteBuffer() {
- return byteBuffer;
- }
-
- public void resetFrame() {
- Arrays.fill(bytes, INVALID_BYTE_VALUE);
- }
-
- }
@Override
public String printInfo() {
return null;
}
+ @Override
+ public int getTableSize() {
+ return tableSize;
+ }
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
new file mode 100644
index 0000000..f10a677
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/LinearProbeHashTableTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LinearProbeHashTableTest {
+
+ private IHyracksFrameMgrContext ctx;
+
+ @Before
+ public void setUp() {
+ ctx = new FrameManager(256);
+ }
+
+ @Test
+ public void testLinearProbeWithoutCollision() throws Exception {
+ int elementRange = 100;
+ LinearProbeHashTable hashTable = new
LinearProbeHashTable(elementRange, ctx);
+ for (int iter1 = 0; iter1 < elementRange; iter1++) {
+ hashTable.insert(iter1, new TuplePointer(iter1, iter1));
+ }
+ TuplePointer readPtr = new TuplePointer(-1, -1);
+ for (int iter1 = 0; iter1 < elementRange; iter1++) {
+ hashTable.getTuplePointer(iter1, 0, readPtr);
+ Assert.assertEquals(iter1, readPtr.getFrameIndex());
+ Assert.assertEquals(iter1, readPtr.getTupleIndex());
+ readPtr.reset(-1, -1);
+ }
+ }
+
+ @Test
+ public void testLinearProbeWithCollision() throws Exception {
+ int elementRange = 100;
+ int repeat = 4;
+ LinearProbeHashTable hashTable = new LinearProbeHashTable(elementRange
* repeat, ctx);
+ for (int iter1 = 0; iter1 < elementRange; iter1++) {
+ for (int iter2 = 0; iter2 < repeat; iter2++) {
+ hashTable.insert(iter1, new TuplePointer(iter1, iter1));
+ }
+ }
+ TuplePointer readPtr = new TuplePointer(-1, -1);
+ for (int iter1 = 0; iter1 < elementRange; iter1++) {
+ int bucketSize = hashTable.getTupleCount(iter1);
+ int foundCnt = 0;
+ for (int iter2 = 0; iter2 < bucketSize && foundCnt < repeat;
iter2++) {
+ hashTable.getTuplePointer(iter1, iter2, readPtr);
+ if (readPtr.getTupleIndex() == iter1) {
+ foundCnt++;
+ }
+ }
+ Assert.assertEquals(repeat, foundCnt);
+ }
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1993
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I61e11d8223902cc34e81738660c1b3ed0ab86318
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>