Taewoo Kim has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1056
Change subject: ASTERIXDB-1566: Fixed External Hash Group By to conform to the
memory budget
......................................................................
ASTERIXDB-1566: Fixed External Hash Group By to conform to the memory budget
- External Hash Group By now conforms to the memory budget
(compiler.groupmemory)
- This is a temporary fix and another patchset will be submitted soon after we
get the final design.
Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
8 files changed, 255 insertions(+), 133 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/56/1056/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index cd088c1..109d49d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.PrintWriter;
+import java.math.BigInteger;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
@@ -248,6 +249,27 @@
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
+ // To set the number of hash values (slots) in external group by hash
table,
+ // we try to get a prime number that is ranged between 80% *
ceil(groupFrameLimit * framesize / 8) and
+ // 90% * ceil(groupFrameLimit * framesize / 8).
+ // Here, we assume that each hash entry pointer occupies 8 bytes
(frame, offset).
+ // So, the number of hash entry pointer in a frame is ceil(framesize /
8).
+ // The reason why we set the limit as 90% is that if it occupies 100%
capacity,
+ // then there is no space for saving actual tuples.
+ int hashEntryMinSize = (int) Math.ceil(0.8 * groupFrameLimit *
frameSize / 8);
+ int hashEntryMaxSize = (int) Math.ceil(0.9 * groupFrameLimit *
frameSize / 8);
+ BigInteger tableSizePrimeNumber =
BigInteger.valueOf(hashEntryMinSize).nextProbablePrime();
+ double capacityOccupationRatio = 0.8;
+ // If this number is bigger than 90%, then we try to find a prime
number that fits within the budget.
+ while (tableSizePrimeNumber.longValue() > hashEntryMaxSize) {
+ // Try to reduce the ratio by 2% and try to get a prime number
again.
+ capacityOccupationRatio -= 0.02;
+ hashEntryMinSize = (int) Math.ceil(capacityOccupationRatio *
groupFrameLimit * frameSize / 8);
+ tableSizePrimeNumber =
BigInteger.valueOf(hashEntryMinSize).nextProbablePrime();
+ }
+ OptimizationConfUtil.getPhysicalOptimizationConfig()
+ .setExternalGroupByTableSize(tableSizePrimeNumber.intValue());
+
HeuristicCompilerFactoryBuilder builder =
new
HeuristicCompilerFactoryBuilder(AqlOptimizationContextFactory.INSTANCE);
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index a0a9ab0..db9d7e5 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -27,6 +27,9 @@
public interface IPartitionedTupleBufferManager {
+ // Return the number of entire frames that are allocated to this buffer
manager.
+ int getFrameCount();
+
int getNumPartitions();
int getNumTuples(int partition);
@@ -37,14 +40,16 @@
* Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start,
int size) into
* specified partition. The handle is written into the tuplepointer.
* <br>
- * If {@code byteArray} contains the {@code fieldEndOffsets} already, then
please set the {@code fieldEndOffsets} as NULL
+ * If {@code byteArray} contains the {@code fieldEndOffsets} already,
+ * then please set the {@code fieldEndOffsets} as NULL.
*
* @param partition
* the id of the partition to insert the tuple into
* @param byteArray
* the byteArray which contains the tuple
* @param fieldEndOffsets
- * the fieldEndOffsets which comes from the ArrayTupleBuilder,
please set it to NULL if the {@code byteArray} already contains the
fieldEndOffsets
+ * the fieldEndOffsets which comes from the ArrayTupleBuilder,
please set it to NULL
+ * if the {@code byteArray} already contains the fieldEndOffsets
* @param start
* the start offset in the {@code byteArray}
* @param size
@@ -59,7 +64,7 @@
/**
* Insert tuple {@code tupleId} from the {@code tupleAccessor} into the
given partition.
* The returned handle is written into the tuplepointer
- *
+ *
* @param partition
* the id of the partition to insert the tuple
* @param tupleAccessor
@@ -67,7 +72,7 @@
* @param tupleId
* the id of the tuple from the tupleAccessor
* @param pointer
- * the returned pointer indicating the handler to later fetch
the tuple from the buffer maanager
+ * the returned pointer indicating the handler to later fetch
the tuple from the buffer manager
* @return true if the insertion succeed. Otherwise return false.
* @throws HyracksDataException
*/
@@ -75,8 +80,9 @@
throws HyracksDataException;
/**
- * Reset to the initial states. The previous allocated resources won't be
released in order to be used in the next round.
- *
+ * Reset to the initial states. The previous allocated resources won't be
released
+ * in order to be used in the next round.
+ *
* @throws HyracksDataException
*/
void reset() throws HyracksDataException;
@@ -93,7 +99,7 @@
* This partition will not be cleared.
* Currently it is used by Join where we flush the inner partition to the
join (as a frameWriter),
* but we will still keep the inner for the next outer partition.
- *
+ *
* @param pid
* @param writer
* @throws HyracksDataException
@@ -102,7 +108,7 @@
/**
* Clear the memory occupation of the particular partition.
- *
+ *
* @param partition
* @throws HyracksDataException
*/
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index f9387a9..6751296 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -40,14 +40,22 @@
}
public int selectVictimPartition(int failedToInsertPartition) {
- // To avoid flush the half-full frame, it's better to spill itself.
+ // To avoid flush a half-full frame, it's better to spill itself.
if (bufferManager.getNumTuples(failedToInsertPartition) > 0) {
return failedToInsertPartition;
}
+ // Try to find a victim partition from the already spilled partitions.
+ // The reason is that we want to keep in-memory partition (not spilled
to the disk yet) as long as possible
+ // to reduce the overhead of writing to and reading from disk.
int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage();
int maxToSpillPartSize = 0;
- // if we couldn't find the already spilled partition, or it is too
small to flush that one,
+ // If we couldn't find an already spilled partition, or it is too
small to flush that one,
// try to flush an in memory partition.
+ //
+ // Note: right now, the
createAtMostOneFrameForSpilledPartitionConstrain we are using for a spilled
partition
+ // enforces that the number of maximum frame for a spilled partition
is 1. So, the following first if statement
+ // is always true. i.e. we need to spill an in-memory partition anyway.
+ // But, when we will have another policy, the if statement will make
more sense.
if (partitionToSpill < 0
|| (maxToSpillPartSize =
bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) {
int partitionInMem = findInMemPartitionWithMaxMemoryUsage();
@@ -81,7 +89,7 @@
/**
* Create an constrain for the already spilled partition that it can only
use at most one frame.
- *
+ *
* @param spillStatus
* @return
*/
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index c193c3b..c1c6672 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -47,6 +47,8 @@
private final FixedSizeFrameTupleAppender appender;
private BufferInfo tempInfo;
private final IPartitionedMemoryConstrain constrain;
+ // total number of frames that are allocated
+ private int totalFrameCount = 0;
public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx,
IPartitionedMemoryConstrain constrain,
int partitions, int frameLimitInBytes) throws HyracksDataException
{
@@ -71,6 +73,12 @@
}
Arrays.fill(numTuples, 0);
appendFrame.reset(null);
+ totalFrameCount = 0;
+ }
+
+ @Override
+ public int getFrameCount() {
+ return totalFrameCount;
}
@Override
@@ -101,6 +109,7 @@
if (partition != null) {
for (int i = 0; i < partition.getNumFrames(); ++i) {
framePool.deAllocateBuffer(partition.getFrame(i,
tempInfo).getBuffer());
+ totalFrameCount--;
}
}
partitionArray[partitionId].reset();
@@ -163,6 +172,8 @@
ByteBuffer newBuffer = requestNewBufferFromPool(size);
if (newBuffer == null) {
return -1;
+ } else {
+ totalFrameCount++;
}
appendFrame.reset(newBuffer);
appender.reset(appendFrame, true);
@@ -274,4 +285,5 @@
}
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f08d27d..f62360c 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -140,7 +140,7 @@
}
@Override
- public boolean insert(IFrameTupleAccessor accessor, int tIndex)
throws HyracksDataException {
+ public InsertResultType insert(IFrameTupleAccessor accessor, int
tIndex) throws HyracksDataException {
int entryInHashTable = tpc.partition(accessor, tIndex,
tableSize);
for (int i = 0; i <
hashTableForTuplePointer.getTupleCount(entryInHashTable); i++) {
hashTableForTuplePointer.getTuplePointer(entryInHashTable,
i, pointer);
@@ -148,24 +148,30 @@
int c = ftpcInputCompareToAggregate.compare(accessor,
tIndex, bufferAccessor);
if (c == 0) {
aggregateExistingTuple(accessor, tIndex,
bufferAccessor, pointer.getTupleIndex());
- return true;
+ return InsertResultType.SUCCESS;
}
}
return insertNewAggregateEntry(entryInHashTable, accessor,
tIndex);
}
- private boolean insertNewAggregateEntry(int entryInHashTable,
IFrameTupleAccessor accessor, int tIndex)
+ private InsertResultType insertNewAggregateEntry(int
entryInHashTable, IFrameTupleAccessor accessor,
+ int tIndex)
throws HyracksDataException {
initStateTupleBuilder(accessor, tIndex);
int pid = getPartition(entryInHashTable);
if (!bufferManager.insertTuple(pid,
stateTupleBuilder.getByteArray(),
stateTupleBuilder.getFieldEndOffsets(), 0,
stateTupleBuilder.getSize(), pointer)) {
- return false;
+ return InsertResultType.FAIL;
}
hashTableForTuplePointer.insert(entryInHashTable, pointer);
- return true;
+ // If the number of frames allocated to the data table and
hash table exceeds the frame limit,
+ // we need to spill a partition to the disk to make a space.
+ if (bufferManager.getFrameCount() +
hashTableForTuplePointer.getFrameCount() >= framesLimit) {
+ return InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET;
+ }
+ return InsertResultType.SUCCESS;
}
private void initStateTupleBuilder(IFrameTupleAccessor accessor,
int tIndex) throws HyracksDataException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
index 2b9ad54..970ea4a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
@@ -21,9 +21,19 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public interface ISpillableTable {
+
+ /**
+ * Result Type for an insertion.
+ */
+ public static enum InsertResultType {
+ SUCCESS,
+ FAIL,
+ // If a memory budget is given and if an insertion is successful,
+ // but exceeds the given budget, we return this code.
+ SUCCESS_BUT_EXCEEDS_BUDGET
+ }
/**
* Release all the storage resources.
@@ -45,7 +55,7 @@
* @return
* @throws HyracksDataException
*/
- boolean insert(IFrameTupleAccessor accessor, int tIndex) throws
HyracksDataException;
+ InsertResultType insert(IFrameTupleAccessor accessor, int tIndex) throws
HyracksDataException;
/**
* Flush the certain partition to writer, and return the numOfTuples that
have been flushed
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
index e0ef2b3..ffd642a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
import org.apache.hyracks.dataflow.std.group.AggregateType;
import org.apache.hyracks.dataflow.std.group.ISpillableTable;
+import org.apache.hyracks.dataflow.std.group.ISpillableTable.InsertResultType;
public class ExternalHashGroupBy {
@@ -50,7 +51,8 @@
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
- if (!table.insert(accessor, i)) {
+ InsertResultType result = table.insert(accessor, i);
+ if (result == InsertResultType.FAIL || result ==
InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET) {
do {
int partition = table.findVictimPartition(accessor, i);
if (partition < 0) {
@@ -58,7 +60,13 @@
}
RunFileWriter writer =
getPartitionWriterOrCreateOneIfNotExist(partition);
flushPartitionToRun(partition, writer);
- } while (!table.insert(accessor, i));
+ if (result == InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET)
{
+ // If the result type is SUCCESS_BUT_EXCEEDS_BUDGET,
+ // then we don't need to re-insert this tuple.
+ break;
+ }
+ result = table.insert(accessor, i);
+ } while (result != InsertResultType.SUCCESS);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index b42cdb7..ca43e1e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -25,22 +25,33 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
- * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
+ * This table consists of header frames and content frames.
+ * .
+ * Header indicates the first entry slot location for the given integer value.
+ * A header slot consists of [content frame number], [offset in that frame] to
get
+ * the first tuple's pointer information that shares the same value.
+ * .
+ * An entry slot in the content frame is as follows.
+ * [capacity of the slot], [# of occupied elements], {[frameIndex],
[tupleIndex]}+;
* fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
*/
public class SerializableHashTable implements ISerializableTable {
private static final int INT_SIZE = 4;
+ // Initial entry slot size
private static final int INIT_ENTRY_SIZE = 4;
+ // Header frame array
private IntSerDeBuffer[] headers;
+ // Content frame list
private List<IntSerDeBuffer> contents = new ArrayList<>();
- private List<Integer> frameCurrentIndex = new ArrayList<>();
+ private List<Integer> currentOffsetInFrameList = new ArrayList<>();
private final IHyracksFrameMgrContext ctx;
private final int frameCapacity;
- private int currentLargestFrameIndex = 0;
+ private int currentLargestFrameNumber = 0;
private int tupleCount = 0;
- private int headerFrameCount = 0;
+ // The number of total frames that are allocated to the headers and
contents
+ private int totalFrameCount = 0;
private TuplePointer tempTuplePointer = new TuplePointer();
public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext
ctx) throws HyracksDataException {
@@ -53,61 +64,65 @@
IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
contents.add(frame);
- frameCurrentIndex.add(0);
+ totalFrameCount++;
+ currentOffsetInFrameList.add(0);
frameCapacity = frame.capacity();
}
@Override
public void insert(int entry, TuplePointer pointer) throws
HyracksDataException {
- int hFrameIndex = getHeaderFrameIndex(entry);
- int headerOffset = getHeaderFrameOffset(entry);
- IntSerDeBuffer header = headers[hFrameIndex];
- if (header == null) {
- header = new IntSerDeBuffer(ctx.allocateFrame().array());
- headers[hFrameIndex] = header;
- resetFrame(header);
- headerFrameCount++;
+ int headerFrameIndex = getHeaderFrameIndex(entry);
+ int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+ IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+ if (headerFrame == null) {
+ headerFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
+ headers[headerFrameIndex] = headerFrame;
+ resetFrame(headerFrame);
+ totalFrameCount++;
}
- int frameIndex = header.getInt(headerOffset);
- int offsetIndex = header.getInt(headerOffset + 1);
- if (frameIndex < 0) {
- // insert first tuple into the entry
- insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
+ int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+ if (contentFrameIndex < 0) {
+ // Since the initial value of index and offset is -1, this means
that the entry slot for
+ // this entry is not created yet. So, create the entry slot and
insert first tuple into that slot.
+ insertNewEntry(headerFrame, offsetInHeaderFrame, INIT_ENTRY_SIZE,
pointer);
} else {
- // insert non-first tuple into the entry
- insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex,
pointer);
+ // The entry slot already exists. Insert non-first tuple into the
entry slot
+ int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame
+ 1);
+ insertNonFirstTuple(headerFrame, offsetInHeaderFrame,
contentFrameIndex, offsetInContentFrame, pointer);
}
tupleCount++;
}
@Override
+ // Reset the slot information for the entry. Specifically, we reset the
number of used count in the slot as 0.
public void delete(int entry) {
- int hFrameIndex = getHeaderFrameIndex(entry);
- int headerOffset = getHeaderFrameOffset(entry);
- IntSerDeBuffer header = headers[hFrameIndex];
+ int headerFrameIndex = getHeaderFrameIndex(entry);
+ int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[headerFrameIndex];
if (header != null) {
- int frameIndex = header.getInt(headerOffset);
- int offsetIndex = header.getInt(headerOffset + 1);
- if (frameIndex >= 0) {
- IntSerDeBuffer frame = contents.get(frameIndex);
- int entryUsedItems = frame.getInt(offsetIndex + 1);
- frame.writeInt(offsetIndex + 1, 0);
- tupleCount -= entryUsedItems;
+ int contentFrameIndex = header.getInt(offsetInHeaderFrame);
+ int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1);
+ if (contentFrameIndex >= 0) {
+ IntSerDeBuffer frame = contents.get(contentFrameIndex);
+ int entryUsedCountInSlot = frame.getInt(offsetInContentFrame +
1);
+ // Set used count as 0 in the slot.
+ frame.writeInt(offsetInContentFrame + 1, 0);
+ tupleCount -= entryUsedCountInSlot;
}
}
}
@Override
public boolean getTuplePointer(int entry, int offset, TuplePointer
dataPointer) {
- int hFrameIndex = getHeaderFrameIndex(entry);
- int headerOffset = getHeaderFrameOffset(entry);
- IntSerDeBuffer header = headers[hFrameIndex];
+ int headerFrameIndex = getHeaderFrameIndex(entry);
+ int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[headerFrameIndex];
if (header == null) {
dataPointer.reset(-1, -1);
return false;
}
- int frameIndex = header.getInt(headerOffset);
- int offsetIndex = header.getInt(headerOffset + 1);
+ int frameIndex = header.getInt(offsetInHeaderFrame);
+ int offsetIndex = header.getInt(offsetInHeaderFrame + 1);
if (frameIndex < 0) {
dataPointer.reset(-1, -1);
return false;
@@ -134,18 +149,18 @@
if (frame != null)
resetFrame(frame);
- frameCurrentIndex.clear();
+ currentOffsetInFrameList.clear();
for (int i = 0; i < contents.size(); i++) {
- frameCurrentIndex.add(0);
+ currentOffsetInFrameList.add(0);
}
- currentLargestFrameIndex = 0;
+ currentLargestFrameNumber = 0;
tupleCount = 0;
}
@Override
public int getFrameCount() {
- return headerFrameCount + contents.size();
+ return totalFrameCount;
}
@Override
@@ -154,17 +169,20 @@
}
@Override
+ /**
+ * Returns the tuple count in the slot for the given entry.
+ */
public int getTupleCount(int entry) {
- int hFrameIndex = getHeaderFrameIndex(entry);
- int headerOffset = getHeaderFrameOffset(entry);
- IntSerDeBuffer header = headers[hFrameIndex];
- if (header != null) {
- int frameIndex = header.getInt(headerOffset);
- int offsetIndex = header.getInt(headerOffset + 1);
- if (frameIndex >= 0) {
- IntSerDeBuffer frame = contents.get(frameIndex);
- int entryUsedItems = frame.getInt(offsetIndex + 1);
- return entryUsedItems;
+ int headerFrameIndex = getHeaderFrameIndex(entry);
+ int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+ IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+ if (headerFrame != null) {
+ int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+ int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame
+ 1);
+ if (contentFrameIndex >= 0) {
+ IntSerDeBuffer frame = contents.get(contentFrameIndex);
+ int entryUsedCountInSlot = frame.getInt(offsetInContentFrame +
1);
+ return entryUsedCountInSlot;
}
}
return 0;
@@ -176,102 +194,134 @@
for (int i = 0; i < headers.length; i++)
headers[i] = null;
contents.clear();
- frameCurrentIndex.clear();
+ currentOffsetInFrameList.clear();
tupleCount = 0;
- currentLargestFrameIndex = 0;
+ totalFrameCount = 0;
+ currentLargestFrameNumber = 0;
ctx.deallocateFrames(nFrames);
}
- private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int
entryCapacity, TuplePointer pointer)
+ private void insertNewEntry(IntSerDeBuffer header, int
offsetInHeaderFrame, int entryCapacity, TuplePointer pointer)
throws HyracksDataException {
- IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
- int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
+ IntSerDeBuffer lastContentFrame =
contents.get(currentLargestFrameNumber);
+ int lastOffsetInCurrentFrame =
currentOffsetInFrameList.get(currentLargestFrameNumber);
int requiredIntCapacity = entryCapacity * 2;
- int startFrameIndex = currentLargestFrameIndex;
+ int currentFrameNumber = currentLargestFrameNumber;
- if (lastIndex + requiredIntCapacity >= frameCapacity) {
- IntSerDeBuffer newFrame;
- startFrameIndex++;
+ if (lastOffsetInCurrentFrame + requiredIntCapacity >= frameCapacity) {
+ IntSerDeBuffer newContentFrame;
+ currentFrameNumber++;
do {
- if (currentLargestFrameIndex >= contents.size() - 1) {
- newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
- currentLargestFrameIndex++;
- contents.add(newFrame);
- frameCurrentIndex.add(0);
+ if (currentLargestFrameNumber >= contents.size() - 1) {
+ newContentFrame = new
IntSerDeBuffer(ctx.allocateFrame().array());
+ currentLargestFrameNumber++;
+ contents.add(newContentFrame);
+ totalFrameCount++;
+ currentOffsetInFrameList.add(0);
} else {
- currentLargestFrameIndex++;
- frameCurrentIndex.set(currentLargestFrameIndex, 0);
+ currentLargestFrameNumber++;
+ currentOffsetInFrameList.set(currentLargestFrameNumber, 0);
}
requiredIntCapacity -= frameCapacity;
} while (requiredIntCapacity > 0);
- lastIndex = 0;
- lastFrame = contents.get(startFrameIndex);
+ lastOffsetInCurrentFrame = 0;
+ lastContentFrame = contents.get(currentFrameNumber);
}
// set header
- header.writeInt(headerOffset, startFrameIndex);
- header.writeInt(headerOffset + 1, lastIndex);
+ header.writeInt(offsetInHeaderFrame, currentFrameNumber);
+ header.writeInt(offsetInHeaderFrame + 1, lastOffsetInCurrentFrame);
- // set the entry
- lastFrame.writeInt(lastIndex, entryCapacity - 1);
- lastFrame.writeInt(lastIndex + 1, 1);
- lastFrame.writeInt(lastIndex + 2, pointer.getFrameIndex());
- lastFrame.writeInt(lastIndex + 3, pointer.getTupleIndex());
- int newLastIndex = lastIndex + entryCapacity * 2;
- newLastIndex = newLastIndex < frameCapacity ? newLastIndex :
frameCapacity - 1;
- frameCurrentIndex.set(startFrameIndex, newLastIndex);
+ // set the entry & its slot.
+ // 1. slot capacity
+ lastContentFrame.writeInt(lastOffsetInCurrentFrame, entryCapacity - 1);
+ // 2. used count in the slot
+ lastContentFrame.writeInt(lastOffsetInCurrentFrame + 1, 1);
+ // 3. initial entry in the slot
+ lastContentFrame.writeInt(lastOffsetInCurrentFrame + 2,
pointer.getFrameIndex());
+ lastContentFrame.writeInt(lastOffsetInCurrentFrame + 3,
pointer.getTupleIndex());
+ int newLastOffsetInContentFrame = lastOffsetInCurrentFrame +
entryCapacity * 2;
+ newLastOffsetInContentFrame = newLastOffsetInContentFrame <
frameCapacity ? newLastOffsetInContentFrame
+ : frameCapacity - 1;
+ currentOffsetInFrameList.set(currentFrameNumber,
newLastOffsetInContentFrame);
- requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
+ requiredIntCapacity = entryCapacity * 2 - (frameCapacity -
lastOffsetInCurrentFrame);
while (requiredIntCapacity > 0) {
- startFrameIndex++;
+ currentFrameNumber++;
requiredIntCapacity -= frameCapacity;
- newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity +
frameCapacity : frameCapacity - 1;
- frameCurrentIndex.set(startFrameIndex, newLastIndex);
+ newLastOffsetInContentFrame = requiredIntCapacity < 0 ?
requiredIntCapacity + frameCapacity
+ : frameCapacity - 1;
+ currentOffsetInFrameList.set(currentFrameNumber,
newLastOffsetInContentFrame);
}
}
- private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset,
int frameIndexArg, int offsetIndex,
+ private void insertNonFirstTuple(IntSerDeBuffer header, int
offsetInHeaderFrame, int contentFrameIndex,
+ int offsetInContentFrame,
TuplePointer pointer) throws HyracksDataException {
- int frameIndex = frameIndexArg;
- IntSerDeBuffer frame = contents.get(frameIndex);
- int entryItems = frame.getInt(offsetIndex);
- int entryUsedItems = frame.getInt(offsetIndex + 1);
-
- if (entryUsedItems < entryItems) {
- frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
- int startIndex = offsetIndex + 2 + entryUsedItems * 2;
- while (startIndex >= frameCapacity) {
+ int frameIndex = contentFrameIndex;
+ IntSerDeBuffer contentFrame = contents.get(frameIndex);
+ int entrySlotCapacity = contentFrame.getInt(offsetInContentFrame);
+ int entryUsedCountInSlot = contentFrame.getInt(offsetInContentFrame +
1);
+ boolean frameIndexChanged = false;
+ if (entryUsedCountInSlot < entrySlotCapacity) {
+ // The slot has at least one space to accommodate this tuple
pointer.
+ // Increase used count.
+ contentFrame.writeInt(offsetInContentFrame + 1,
entryUsedCountInSlot + 1);
+ // Calculate the first empty spot in the slot.
+ // +2: (capacity, # of used entry count)
+ // *2: each tuplePointer's occupation (frame index + offset in
that frame)
+ int startOffsetInContentFrame = offsetInContentFrame + 2 +
entryUsedCountInSlot * 2;
+ while (startOffsetInContentFrame >= frameCapacity) {
++frameIndex;
- startIndex -= frameCapacity;
+ startOffsetInContentFrame -= frameCapacity;
+ frameIndexChanged = true;
}
- frame = contents.get(frameIndex);
- frame.writeInt(startIndex, pointer.getFrameIndex());
- frame.writeInt(startIndex + 1, pointer.getTupleIndex());
+ // We don't have to read content frame again if the frame index
has not been changed.
+ if (frameIndexChanged) {
+ contentFrame = contents.get(frameIndex);
+ }
+ contentFrame.writeInt(startOffsetInContentFrame,
pointer.getFrameIndex());
+ contentFrame.writeInt(startOffsetInContentFrame + 1,
pointer.getTupleIndex());
} else {
- int capacity = (entryItems + 1) * 2;
- header.writeInt(headerOffset, -1);
- header.writeInt(headerOffset + 1, -1);
- int fIndex = frame.getInt(offsetIndex + 2);
- int tIndex = frame.getInt(offsetIndex + 3);
- tempTuplePointer.reset(fIndex, tIndex);
- this.insertNewEntry(header, headerOffset, capacity,
tempTuplePointer);
- int newFrameIndex = header.getInt(headerOffset);
- int newTupleIndex = header.getInt(headerOffset + 1);
+ // There is no enough space in this slot.We need to increase the
slot size and
+ // migrate the current entries in it.
- for (int i = 1; i < entryUsedItems; i++) {
- int startIndex = offsetIndex + 2 + i * 2;
+ // New capacity: double the original capacity
+ int capacity = (entrySlotCapacity + 1) * 2;
+ // Temporarily set the header as -1 for the slot.
+ header.writeInt(offsetInHeaderFrame, -1);
+ header.writeInt(offsetInHeaderFrame + 1, -1);
+ // Get the location of the initial entry.
+ int fIndex = contentFrame.getInt(offsetInContentFrame + 2);
+ int tIndex = contentFrame.getInt(offsetInContentFrame + 3);
+ tempTuplePointer.reset(fIndex, tIndex);
+ // Create a new double-sized slot for the current entries and
+ // migrate the initial entry in the slot to the new slot.
+ this.insertNewEntry(header, offsetInHeaderFrame, capacity,
tempTuplePointer);
+ int newFrameIndex = header.getInt(offsetInHeaderFrame);
+ int newTupleIndex = header.getInt(offsetInHeaderFrame + 1);
+ boolean firstIterInTheLoop = true;
+
+ // Migrate the existing entries (from 2nd to the last).
+ for (int i = 1; i < entryUsedCountInSlot; i++) {
+ int startOffsetInContentFrame = offsetInContentFrame + 2 + i *
2;
int startFrameIndex = frameIndex;
- while (startIndex >= frameCapacity) {
+ while (startOffsetInContentFrame >= frameCapacity) {
++startFrameIndex;
- startIndex -= frameCapacity;
+ startOffsetInContentFrame -= frameCapacity;
}
- frame = contents.get(startFrameIndex);
- fIndex = frame.getInt(startIndex);
- tIndex = frame.getInt(startIndex + 1);
+ if (firstIterInTheLoop || startFrameIndex != frameIndex) {
+ // Only read content frame in case frameINdex is changed
or if this is the first iteration.
+ contentFrame = contents.get(startFrameIndex);
+ firstIterInTheLoop = false;
+ }
+ fIndex = contentFrame.getInt(startOffsetInContentFrame);
+ tIndex = contentFrame.getInt(startOffsetInContentFrame + 1);
tempTuplePointer.reset(fIndex, tIndex);
- insertNonFirstTuple(header, headerOffset, newFrameIndex,
newTupleIndex, tempTuplePointer);
+ insertNonFirstTuple(header, offsetInHeaderFrame,
newFrameIndex, newTupleIndex, tempTuplePointer);
}
- insertNonFirstTuple(header, headerOffset, newFrameIndex,
newTupleIndex, pointer);
+ // Now, insert the new entry that caused an overflow to the old
bucket.
+ insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex,
newTupleIndex, pointer);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1056
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <[email protected]>