Taewoo Kim has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1056
Change subject: ASTERIXDB-1566: Fixed External Hash Group By to conform to the memory budget ...................................................................... ASTERIXDB-1566: Fixed External Hash Group By to conform to the memory budget - External Hash Group By now conforms to the memory budget (compiler.groupmemory) - This is a temporary fix and another patchset will be submitted soon after we get the final design. Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java 8 files changed, 255 insertions(+), 133 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/56/1056/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index cd088c1..109d49d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.PrintWriter; +import java.math.BigInteger; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; @@ -248,6 +249,27 @@ OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit); OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit); + // To set the number of hash values (slots) in external group by hash table, + // we try to get a prime number that is ranged between 80% * ceil(groupFrameLimit * framesize / 8) and + // 90% * ceil(groupFrameLimit * framesize / 8). + // Here, we assume that each hash entry pointer occupies 8 bytes (frame, offset). + // So, the number of hash entry pointer in a frame is ceil(framesize / 8). + // The reason why we set the limit as 90% is that if it occupies 100% capacity, + // then there is no space for saving actual tuples. + int hashEntryMinSize = (int) Math.ceil(0.8 * groupFrameLimit * frameSize / 8); + int hashEntryMaxSize = (int) Math.ceil(0.9 * groupFrameLimit * frameSize / 8); + BigInteger tableSizePrimeNumber = BigInteger.valueOf(hashEntryMinSize).nextProbablePrime(); + double capacityOccupationRatio = 0.8; + // If this number is bigger than 90%, then we try to find a prime number that fits within the budget. + while (tableSizePrimeNumber.longValue() > hashEntryMaxSize) { + // Try to reduce the ratio by 2% and try to get a prime number again. + capacityOccupationRatio -= 0.02; + hashEntryMinSize = (int) Math.ceil(capacityOccupationRatio * groupFrameLimit * frameSize / 8); + tableSizePrimeNumber = BigInteger.valueOf(hashEntryMinSize).nextProbablePrime(); + } + OptimizationConfUtil.getPhysicalOptimizationConfig() + .setExternalGroupByTableSize(tableSizePrimeNumber.intValue()); + HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(AqlOptimizationContextFactory.INSTANCE); builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig()); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java index a0a9ab0..db9d7e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java @@ -27,6 +27,9 @@ public interface IPartitionedTupleBufferManager { + // Return the number of entire frames that are allocated to this buffer manager. + int getFrameCount(); + int getNumPartitions(); int getNumTuples(int partition); @@ -37,14 +40,16 @@ * Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start, int size) into * specified partition. The handle is written into the tuplepointer. * <br> - * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the {@code fieldEndOffsets} as NULL + * If {@code byteArray} contains the {@code fieldEndOffsets} already, + * then please set the {@code fieldEndOffsets} as NULL. * * @param partition * the id of the partition to insert the tuple into * @param byteArray * the byteArray which contains the tuple * @param fieldEndOffsets - * the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to NULL if the {@code byteArray} already contains the fieldEndOffsets + * the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to NULL + * if the {@code byteArray} already contains the fieldEndOffsets * @param start * the start offset in the {@code byteArray} * @param size @@ -59,7 +64,7 @@ /** * Insert tuple {@code tupleId} from the {@code tupleAccessor} into the given partition. * The returned handle is written into the tuplepointer - * + * * @param partition * the id of the partition to insert the tuple * @param tupleAccessor @@ -67,7 +72,7 @@ * @param tupleId * the id of the tuple from the tupleAccessor * @param pointer - * the returned pointer indicating the handler to later fetch the tuple from the buffer maanager + * the returned pointer indicating the handler to later fetch the tuple from the buffer manager * @return true if the insertion succeed. Otherwise return false. * @throws HyracksDataException */ @@ -75,8 +80,9 @@ throws HyracksDataException; /** - * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round. - * + * Reset to the initial states. The previous allocated resources won't be released + * in order to be used in the next round. + * * @throws HyracksDataException */ void reset() throws HyracksDataException; @@ -93,7 +99,7 @@ * This partition will not be cleared. * Currently it is used by Join where we flush the inner partition to the join (as a frameWriter), * but we will still keep the inner for the next outer partition. - * + * * @param pid * @param writer * @throws HyracksDataException @@ -102,7 +108,7 @@ /** * Clear the memory occupation of the particular partition. - * + * * @param partition * @throws HyracksDataException */ diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java index f9387a9..6751296 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java @@ -40,14 +40,22 @@ } public int selectVictimPartition(int failedToInsertPartition) { - // To avoid flush the half-full frame, it's better to spill itself. + // To avoid flush a half-full frame, it's better to spill itself. if (bufferManager.getNumTuples(failedToInsertPartition) > 0) { return failedToInsertPartition; } + // Try to find a victim partition from the already spilled partitions. + // The reason is that we want to keep in-memory partition (not spilled to the disk yet) as long as possible + // to reduce the overhead of writing to and reading from disk. int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage(); int maxToSpillPartSize = 0; - // if we couldn't find the already spilled partition, or it is too small to flush that one, + // If we couldn't find an already spilled partition, or it is too small to flush that one, // try to flush an in memory partition. + // + // Note: right now, the createAtMostOneFrameForSpilledPartitionConstrain we are using for a spilled partition + // enforces that the number of maximum frame for a spilled partition is 1. So, the following first if statement + // is always true. i.e. we need to spill an in-memory partition anyway. + // But, when we will have another policy, the if statement will make more sense. if (partitionToSpill < 0 || (maxToSpillPartSize = bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) { int partitionInMem = findInMemPartitionWithMaxMemoryUsage(); @@ -81,7 +89,7 @@ /** * Create an constrain for the already spilled partition that it can only use at most one frame. - * + * * @param spillStatus * @return */ diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java index c193c3b..c1c6672 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java @@ -47,6 +47,8 @@ private final FixedSizeFrameTupleAppender appender; private BufferInfo tempInfo; private final IPartitionedMemoryConstrain constrain; + // total number of frames that are allocated + private int totalFrameCount = 0; public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain, int partitions, int frameLimitInBytes) throws HyracksDataException { @@ -71,6 +73,12 @@ } Arrays.fill(numTuples, 0); appendFrame.reset(null); + totalFrameCount = 0; + } + + @Override + public int getFrameCount() { + return totalFrameCount; } @Override @@ -101,6 +109,7 @@ if (partition != null) { for (int i = 0; i < partition.getNumFrames(); ++i) { framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer()); + totalFrameCount--; } } partitionArray[partitionId].reset(); @@ -163,6 +172,8 @@ ByteBuffer newBuffer = requestNewBufferFromPool(size); if (newBuffer == null) { return -1; + } else { + totalFrameCount++; } appendFrame.reset(newBuffer); appender.reset(appendFrame, true); @@ -274,4 +285,5 @@ } + } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java index f08d27d..f62360c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java @@ -140,7 +140,7 @@ } @Override - public boolean insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException { + public InsertResultType insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException { int entryInHashTable = tpc.partition(accessor, tIndex, tableSize); for (int i = 0; i < hashTableForTuplePointer.getTupleCount(entryInHashTable); i++) { hashTableForTuplePointer.getTuplePointer(entryInHashTable, i, pointer); @@ -148,24 +148,30 @@ int c = ftpcInputCompareToAggregate.compare(accessor, tIndex, bufferAccessor); if (c == 0) { aggregateExistingTuple(accessor, tIndex, bufferAccessor, pointer.getTupleIndex()); - return true; + return InsertResultType.SUCCESS; } } return insertNewAggregateEntry(entryInHashTable, accessor, tIndex); } - private boolean insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, int tIndex) + private InsertResultType insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, + int tIndex) throws HyracksDataException { initStateTupleBuilder(accessor, tIndex); int pid = getPartition(entryInHashTable); if (!bufferManager.insertTuple(pid, stateTupleBuilder.getByteArray(), stateTupleBuilder.getFieldEndOffsets(), 0, stateTupleBuilder.getSize(), pointer)) { - return false; + return InsertResultType.FAIL; } hashTableForTuplePointer.insert(entryInHashTable, pointer); - return true; + // If the number of frames allocated to the data table and hash table exceeds the frame limit, + // we need to spill a partition to the disk to make a space. + if (bufferManager.getFrameCount() + hashTableForTuplePointer.getFrameCount() >= framesLimit) { + return InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET; + } + return InsertResultType.SUCCESS; } private void initStateTupleBuilder(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java index 2b9ad54..970ea4a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java @@ -21,9 +21,19 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; public interface ISpillableTable { + + /** + * Result Type for an insertion. + */ + public static enum InsertResultType { + SUCCESS, + FAIL, + // If a memory budget is given and if an insertion is successful, + // but exceeds the given budget, we return this code. + SUCCESS_BUT_EXCEEDS_BUDGET + } /** * Release all the storage resources. @@ -45,7 +55,7 @@ * @return * @throws HyracksDataException */ - boolean insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException; + InsertResultType insert(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException; /** * Flush the certain partition to writer, and return the numOfTuples that have been flushed diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java index e0ef2b3..ffd642a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java @@ -27,6 +27,7 @@ import org.apache.hyracks.dataflow.common.io.RunFileWriter; import org.apache.hyracks.dataflow.std.group.AggregateType; import org.apache.hyracks.dataflow.std.group.ISpillableTable; +import org.apache.hyracks.dataflow.std.group.ISpillableTable.InsertResultType; public class ExternalHashGroupBy { @@ -50,7 +51,8 @@ accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); for (int i = 0; i < tupleCount; i++) { - if (!table.insert(accessor, i)) { + InsertResultType result = table.insert(accessor, i); + if (result == InsertResultType.FAIL || result == InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET) { do { int partition = table.findVictimPartition(accessor, i); if (partition < 0) { @@ -58,7 +60,13 @@ } RunFileWriter writer = getPartitionWriterOrCreateOneIfNotExist(partition); flushPartitionToRun(partition, writer); - } while (!table.insert(accessor, i)); + if (result == InsertResultType.SUCCESS_BUT_EXCEEDS_BUDGET) { + // If the result type is SUCCESS_BUT_EXCEEDS_BUDGET, + // then we don't need to re-insert this tuple. + break; + } + result = table.insert(accessor, i); + } while (result != InsertResultType.SUCCESS); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java index b42cdb7..ca43e1e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java @@ -25,22 +25,33 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; /** - * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex; + * This table consists of header frames and content frames. + * . + * Header indicates the first entry slot location for the given integer value. + * A header slot consists of [content frame number], [offset in that frame] to get + * the first tuple's pointer information that shares the same value. + * . + * An entry slot in the content frame is as follows. + * [capacity of the slot], [# of occupied elements], {[frameIndex], [tupleIndex]}+; * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer */ public class SerializableHashTable implements ISerializableTable { private static final int INT_SIZE = 4; + // Initial entry slot size private static final int INIT_ENTRY_SIZE = 4; + // Header frame array private IntSerDeBuffer[] headers; + // Content frame list private List<IntSerDeBuffer> contents = new ArrayList<>(); - private List<Integer> frameCurrentIndex = new ArrayList<>(); + private List<Integer> currentOffsetInFrameList = new ArrayList<>(); private final IHyracksFrameMgrContext ctx; private final int frameCapacity; - private int currentLargestFrameIndex = 0; + private int currentLargestFrameNumber = 0; private int tupleCount = 0; - private int headerFrameCount = 0; + // The number of total frames that are allocated to the headers and contents + private int totalFrameCount = 0; private TuplePointer tempTuplePointer = new TuplePointer(); public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException { @@ -53,61 +64,65 @@ IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array()); contents.add(frame); - frameCurrentIndex.add(0); + totalFrameCount++; + currentOffsetInFrameList.add(0); frameCapacity = frame.capacity(); } @Override public void insert(int entry, TuplePointer pointer) throws HyracksDataException { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; - if (header == null) { - header = new IntSerDeBuffer(ctx.allocateFrame().array()); - headers[hFrameIndex] = header; - resetFrame(header); - headerFrameCount++; + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer headerFrame = headers[headerFrameIndex]; + if (headerFrame == null) { + headerFrame = new IntSerDeBuffer(ctx.allocateFrame().array()); + headers[headerFrameIndex] = headerFrame; + resetFrame(headerFrame); + totalFrameCount++; } - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex < 0) { - // insert first tuple into the entry - insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer); + int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame); + if (contentFrameIndex < 0) { + // Since the initial value of index and offset is -1, this means that the entry slot for + // this entry is not created yet. So, create the entry slot and insert first tuple into that slot. + insertNewEntry(headerFrame, offsetInHeaderFrame, INIT_ENTRY_SIZE, pointer); } else { - // insert non-first tuple into the entry - insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer); + // The entry slot already exists. Insert non-first tuple into the entry slot + int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1); + insertNonFirstTuple(headerFrame, offsetInHeaderFrame, contentFrameIndex, offsetInContentFrame, pointer); } tupleCount++; } @Override + // Reset the slot information for the entry. Specifically, we reset the number of used count in the slot as 0. public void delete(int entry) { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer header = headers[headerFrameIndex]; if (header != null) { - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex >= 0) { - IntSerDeBuffer frame = contents.get(frameIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - frame.writeInt(offsetIndex + 1, 0); - tupleCount -= entryUsedItems; + int contentFrameIndex = header.getInt(offsetInHeaderFrame); + int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1); + if (contentFrameIndex >= 0) { + IntSerDeBuffer frame = contents.get(contentFrameIndex); + int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1); + // Set used count as 0 in the slot. + frame.writeInt(offsetInContentFrame + 1, 0); + tupleCount -= entryUsedCountInSlot; } } } @Override public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer header = headers[headerFrameIndex]; if (header == null) { dataPointer.reset(-1, -1); return false; } - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); + int frameIndex = header.getInt(offsetInHeaderFrame); + int offsetIndex = header.getInt(offsetInHeaderFrame + 1); if (frameIndex < 0) { dataPointer.reset(-1, -1); return false; @@ -134,18 +149,18 @@ if (frame != null) resetFrame(frame); - frameCurrentIndex.clear(); + currentOffsetInFrameList.clear(); for (int i = 0; i < contents.size(); i++) { - frameCurrentIndex.add(0); + currentOffsetInFrameList.add(0); } - currentLargestFrameIndex = 0; + currentLargestFrameNumber = 0; tupleCount = 0; } @Override public int getFrameCount() { - return headerFrameCount + contents.size(); + return totalFrameCount; } @Override @@ -154,17 +169,20 @@ } @Override + /** + * Returns the tuple count in the slot for the given entry. + */ public int getTupleCount(int entry) { - int hFrameIndex = getHeaderFrameIndex(entry); - int headerOffset = getHeaderFrameOffset(entry); - IntSerDeBuffer header = headers[hFrameIndex]; - if (header != null) { - int frameIndex = header.getInt(headerOffset); - int offsetIndex = header.getInt(headerOffset + 1); - if (frameIndex >= 0) { - IntSerDeBuffer frame = contents.get(frameIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - return entryUsedItems; + int headerFrameIndex = getHeaderFrameIndex(entry); + int offsetInHeaderFrame = getHeaderFrameOffset(entry); + IntSerDeBuffer headerFrame = headers[headerFrameIndex]; + if (headerFrame != null) { + int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame); + int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1); + if (contentFrameIndex >= 0) { + IntSerDeBuffer frame = contents.get(contentFrameIndex); + int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1); + return entryUsedCountInSlot; } } return 0; @@ -176,102 +194,134 @@ for (int i = 0; i < headers.length; i++) headers[i] = null; contents.clear(); - frameCurrentIndex.clear(); + currentOffsetInFrameList.clear(); tupleCount = 0; - currentLargestFrameIndex = 0; + totalFrameCount = 0; + currentLargestFrameNumber = 0; ctx.deallocateFrames(nFrames); } - private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer) + private void insertNewEntry(IntSerDeBuffer header, int offsetInHeaderFrame, int entryCapacity, TuplePointer pointer) throws HyracksDataException { - IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex); - int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex); + IntSerDeBuffer lastContentFrame = contents.get(currentLargestFrameNumber); + int lastOffsetInCurrentFrame = currentOffsetInFrameList.get(currentLargestFrameNumber); int requiredIntCapacity = entryCapacity * 2; - int startFrameIndex = currentLargestFrameIndex; + int currentFrameNumber = currentLargestFrameNumber; - if (lastIndex + requiredIntCapacity >= frameCapacity) { - IntSerDeBuffer newFrame; - startFrameIndex++; + if (lastOffsetInCurrentFrame + requiredIntCapacity >= frameCapacity) { + IntSerDeBuffer newContentFrame; + currentFrameNumber++; do { - if (currentLargestFrameIndex >= contents.size() - 1) { - newFrame = new IntSerDeBuffer(ctx.allocateFrame().array()); - currentLargestFrameIndex++; - contents.add(newFrame); - frameCurrentIndex.add(0); + if (currentLargestFrameNumber >= contents.size() - 1) { + newContentFrame = new IntSerDeBuffer(ctx.allocateFrame().array()); + currentLargestFrameNumber++; + contents.add(newContentFrame); + totalFrameCount++; + currentOffsetInFrameList.add(0); } else { - currentLargestFrameIndex++; - frameCurrentIndex.set(currentLargestFrameIndex, 0); + currentLargestFrameNumber++; + currentOffsetInFrameList.set(currentLargestFrameNumber, 0); } requiredIntCapacity -= frameCapacity; } while (requiredIntCapacity > 0); - lastIndex = 0; - lastFrame = contents.get(startFrameIndex); + lastOffsetInCurrentFrame = 0; + lastContentFrame = contents.get(currentFrameNumber); } // set header - header.writeInt(headerOffset, startFrameIndex); - header.writeInt(headerOffset + 1, lastIndex); + header.writeInt(offsetInHeaderFrame, currentFrameNumber); + header.writeInt(offsetInHeaderFrame + 1, lastOffsetInCurrentFrame); - // set the entry - lastFrame.writeInt(lastIndex, entryCapacity - 1); - lastFrame.writeInt(lastIndex + 1, 1); - lastFrame.writeInt(lastIndex + 2, pointer.getFrameIndex()); - lastFrame.writeInt(lastIndex + 3, pointer.getTupleIndex()); - int newLastIndex = lastIndex + entryCapacity * 2; - newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1; - frameCurrentIndex.set(startFrameIndex, newLastIndex); + // set the entry & its slot. + // 1. slot capacity + lastContentFrame.writeInt(lastOffsetInCurrentFrame, entryCapacity - 1); + // 2. used count in the slot + lastContentFrame.writeInt(lastOffsetInCurrentFrame + 1, 1); + // 3. initial entry in the slot + lastContentFrame.writeInt(lastOffsetInCurrentFrame + 2, pointer.getFrameIndex()); + lastContentFrame.writeInt(lastOffsetInCurrentFrame + 3, pointer.getTupleIndex()); + int newLastOffsetInContentFrame = lastOffsetInCurrentFrame + entryCapacity * 2; + newLastOffsetInContentFrame = newLastOffsetInContentFrame < frameCapacity ? newLastOffsetInContentFrame + : frameCapacity - 1; + currentOffsetInFrameList.set(currentFrameNumber, newLastOffsetInContentFrame); - requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex); + requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastOffsetInCurrentFrame); while (requiredIntCapacity > 0) { - startFrameIndex++; + currentFrameNumber++; requiredIntCapacity -= frameCapacity; - newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1; - frameCurrentIndex.set(startFrameIndex, newLastIndex); + newLastOffsetInContentFrame = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity + : frameCapacity - 1; + currentOffsetInFrameList.set(currentFrameNumber, newLastOffsetInContentFrame); } } - private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndexArg, int offsetIndex, + private void insertNonFirstTuple(IntSerDeBuffer header, int offsetInHeaderFrame, int contentFrameIndex, + int offsetInContentFrame, TuplePointer pointer) throws HyracksDataException { - int frameIndex = frameIndexArg; - IntSerDeBuffer frame = contents.get(frameIndex); - int entryItems = frame.getInt(offsetIndex); - int entryUsedItems = frame.getInt(offsetIndex + 1); - - if (entryUsedItems < entryItems) { - frame.writeInt(offsetIndex + 1, entryUsedItems + 1); - int startIndex = offsetIndex + 2 + entryUsedItems * 2; - while (startIndex >= frameCapacity) { + int frameIndex = contentFrameIndex; + IntSerDeBuffer contentFrame = contents.get(frameIndex); + int entrySlotCapacity = contentFrame.getInt(offsetInContentFrame); + int entryUsedCountInSlot = contentFrame.getInt(offsetInContentFrame + 1); + boolean frameIndexChanged = false; + if (entryUsedCountInSlot < entrySlotCapacity) { + // The slot has at least one space to accommodate this tuple pointer. + // Increase used count. + contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot + 1); + // Calculate the first empty spot in the slot. + // +2: (capacity, # of used entry count) + // *2: each tuplePointer's occupation (frame index + offset in that frame) + int startOffsetInContentFrame = offsetInContentFrame + 2 + entryUsedCountInSlot * 2; + while (startOffsetInContentFrame >= frameCapacity) { ++frameIndex; - startIndex -= frameCapacity; + startOffsetInContentFrame -= frameCapacity; + frameIndexChanged = true; } - frame = contents.get(frameIndex); - frame.writeInt(startIndex, pointer.getFrameIndex()); - frame.writeInt(startIndex + 1, pointer.getTupleIndex()); + // We don't have to read content frame again if the frame index has not been changed. + if (frameIndexChanged) { + contentFrame = contents.get(frameIndex); + } + contentFrame.writeInt(startOffsetInContentFrame, pointer.getFrameIndex()); + contentFrame.writeInt(startOffsetInContentFrame + 1, pointer.getTupleIndex()); } else { - int capacity = (entryItems + 1) * 2; - header.writeInt(headerOffset, -1); - header.writeInt(headerOffset + 1, -1); - int fIndex = frame.getInt(offsetIndex + 2); - int tIndex = frame.getInt(offsetIndex + 3); - tempTuplePointer.reset(fIndex, tIndex); - this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer); - int newFrameIndex = header.getInt(headerOffset); - int newTupleIndex = header.getInt(headerOffset + 1); + // There is no enough space in this slot.We need to increase the slot size and + // migrate the current entries in it. - for (int i = 1; i < entryUsedItems; i++) { - int startIndex = offsetIndex + 2 + i * 2; + // New capacity: double the original capacity + int capacity = (entrySlotCapacity + 1) * 2; + // Temporarily set the header as -1 for the slot. + header.writeInt(offsetInHeaderFrame, -1); + header.writeInt(offsetInHeaderFrame + 1, -1); + // Get the location of the initial entry. + int fIndex = contentFrame.getInt(offsetInContentFrame + 2); + int tIndex = contentFrame.getInt(offsetInContentFrame + 3); + tempTuplePointer.reset(fIndex, tIndex); + // Create a new double-sized slot for the current entries and + // migrate the initial entry in the slot to the new slot. + this.insertNewEntry(header, offsetInHeaderFrame, capacity, tempTuplePointer); + int newFrameIndex = header.getInt(offsetInHeaderFrame); + int newTupleIndex = header.getInt(offsetInHeaderFrame + 1); + boolean firstIterInTheLoop = true; + + // Migrate the existing entries (from 2nd to the last). + for (int i = 1; i < entryUsedCountInSlot; i++) { + int startOffsetInContentFrame = offsetInContentFrame + 2 + i * 2; int startFrameIndex = frameIndex; - while (startIndex >= frameCapacity) { + while (startOffsetInContentFrame >= frameCapacity) { ++startFrameIndex; - startIndex -= frameCapacity; + startOffsetInContentFrame -= frameCapacity; } - frame = contents.get(startFrameIndex); - fIndex = frame.getInt(startIndex); - tIndex = frame.getInt(startIndex + 1); + if (firstIterInTheLoop || startFrameIndex != frameIndex) { + // Only read content frame in case frameINdex is changed or if this is the first iteration. + contentFrame = contents.get(startFrameIndex); + firstIterInTheLoop = false; + } + fIndex = contentFrame.getInt(startOffsetInContentFrame); + tIndex = contentFrame.getInt(startOffsetInContentFrame + 1); tempTuplePointer.reset(fIndex, tIndex); - insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer); + insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, tempTuplePointer); } - insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer); + // Now, insert the new entry that caused an overflow to the old bucket. + insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, pointer); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1056 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Taewoo Kim <wangs...@yahoo.com>