Yingyi Bu has posted comments on this change. Change subject: Implemented the memory-bounded HashGroupby and HashJoin for BigObject ......................................................................
Patch Set 16: (59 comments) https://asterix-gerrit.ics.uci.edu/#/c/398/16/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java: Line 71: new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), why unlimited memory? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java: Line 129: System.err.flush(); Do not flush? Line 150: sb.append("ExceptionWhenDeserialzingField "); better error message? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java: Line 28: public GeneratedRunFileReader(FileReference file, IIOManager ioManager, long size, boolean deleteAfterRead, remove "public" since it is supposed to be only be "generated" by a file writer. https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java: Line 27: public abstract class AbstractTupleBufferPointAccessor implements ITupleBufferPointAccessor { "AbstractTupleBufferPointAccessor" -> "AbstractTuplePointerAccessor"? "ITupleBufferPointAccessor" -> "ITuplePointerAccessor"? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java: Line 114: throw new IllegalAccessError("Please call deAllocateBuffer to return the used buffer."); Why throw an exception here? The annotation on the IFramePool interface says that "Reset the counters to initial status" https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java: Line 30: void reset() throws HyracksDataException; Add a description, e.g., what's the difference with close() Line 58: boolean insertTuple(int partition, int[] fieldEndOffsets, byte[] byteArray, int start, int size, switch the order between byte[] byteArray and int[] fieldEndOffsets, because fieldEndOffsets essentially is a descriptor of one tuple, and should better be together with start and size. Line 61: boolean insertTuple(int pid, IFrameTupleAccessor accessorBuild, int tid, TuplePointer tempPtr) Add some annotation for this method. Maybe, unify the parameter names, e.g., pid->partition tempPtr -> pointer Also, accessorBuild -> tupleAccessor Line 75: boolean insertTupleToSpilledPartition(int partition, int[] fieldEndOffsets, byte[] byteArray, int start, int size, switch the order between byte[] byteArray and int[] fieldEndOffsets, because fieldEndOffsets essentially is a descriptor of one tuple, and should better be together with start and size. Line 81: void close(); Describe the contract of this method? Line 83: ITupleBufferPointAccessor getTupleAccessor(RecordDescriptor recordDescriptor); Why do you need the recordDescriptor? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java: Line 31: public interface ITupleBufferPointAccessor extends IFrameTupleAccessor { ITupleBufferPointAccessor->IFrameTuplePointerAccessor? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java: Line 41: public class VGroupTupleBufferManager implements IPartitionedTupleBufferManager { Why it is called "Group"? It makes me think this is sth. only used by the group-operator but it is also used by the join operator. Maybe "Group" -> "Partition"? Line 109: public boolean insertTuple(int partition, int[] fieldEndOffsets, byte[] byteArray, int start, int size, change the order of arguments? Line 132: public boolean insertTuple(int pid, IFrameTupleAccessor accessorBuild, int tid, TuplePointer tempPtr) pid->partition accessorBuild -> tupleAccessor tempPtr -> pointer https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java: Line 49: private static final int DEFAULT_TUPLE_PER_FRAME = 10; Where do you get this number? From the join operator? Line 50: private static double factor = 1.1; "factor" --> "FUDGE_FACTOR" "static" --> "static final" Line 59: public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize, long fileSize, fileSize -> dataByteSize ? Line 74: final FrameTuplePairComparator ftpcInputVSAggregate = new FrameTuplePairComparator(keyFields, aggregatedKeys, What does VSAggregate mean? Line 81: outRecordDescriptor, keyFields, aggregatedKeys, null); aggregatedKeys --> intermediateKeys? Line 95: if (LOGGER.isLoggable(Level.FINE)) { The if-check seems not necessary. Line 105: final ISerializableTable metaTable = new SerializableHashTable(tableSize, ctx); metaTable --> hashTableForTuplePointers Line 112: private BitSet flushedSet = new BitSet(numPhysicalPartitions); "flushedSet" --> "spilledSet" Line 132: return logicalPartition % numPhysicalPartitions; Would that be better to let it be logicalPartition / numPhysicalPartitions? Therefore pointers for the same partition are physically aligned together. Then, iterating over a partition potentially could be more efficient. Line 139: private int getNextLogicalPartition(int curLogicalPartition) { The term "logical partition" and "physical partition" disturbs me a bit. I spend some effort to figure out what they actually are. It looks they actually have standard terminologies: 1. "logical partition" -> "entryInHashTable" 2. "physical partition" --> "partition" Line 248: int flushedPartition = findMaxSizeIdInFlushedPartitions(); "findMaxSizeIdInFlushedPartitions()" -> "findSpilledPartitionWithLargestMemoryUsage"? Line 250: return findMaxSizeIdInMemPartitions(); "findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition" Line 252: // if this maxSize partition is too small to flush, we need to find a better candidate Maybe rephrase the description: "If the victim partition only have one in-memory partition, we need to find another victim. We need to maintain at least one page for a spilled partition to avoid disk thrashing." (Maybe my understanding is wrong.) Line 254: && flushedSet.nextClearBit(0) >= 0) { Can you wrap "flushedSet.nextClearBit(0)>=0" as a private method with a meaningful name, e.g., existsInMemoryPartitions()? This is the only place in findVictimPartition that refers to flushedSet. It would be nice to factor out the details of flushetSet. Line 255: int max = findMaxSizeIdInMemPartitions(); "findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition" Line 295: private int getNumOfPartitions(int tableSize, int fileSizeInFrames, int memSize) { fileSizeInFrames --> numFramesForData ? memSize --> frameLimit ? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java: Line 39: * @param tupleBuilder ? reorder the params and remove "?" Line 83: * @return TODO "TODO" ->... Line 99: * @return TODO "TODO" --> .... https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java: Line 28: void close() throws HyracksDataException; Add some description for close(), clear(), and insert()? Line 51: * The {@code accessor} and {@code tIndex} given the reference to the tuple tobe inserted. "tobe" --> "to be" https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java: Line 55: private ExternalHashGroupBy externalGroup; "externalGroup" --> "externalGroupBy" Line 101: //do nothing for failures It seems that we need a flag to indicate whether there is a failure in the data producer of this operator. Line 107: ctx.setStateObject(state); If there is a failure, we should not set the state. Line 120: externalGroup = null; In case there is a failure, we still want to cleanup the temporary run files. https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java: Line 42: public class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable "ExternalGroupMergeOperatorNodePushable" --> "ExternalGroupWriteOperatorNodePushable" Because we don't do "merge" here. "Write" might be the right term since it writes out all group-by states. Line 96: writer.fail(); In case there is a failure in its data consumer, we should still cleanup leftover run files if any. Line 115: ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, sizeInTuples[i], "sizeInTuples" --> "numOfTuples" ? Line 143: groupBy.build(frame.getBuffer()); "build" --> "insert" ? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java: Line 145: keyFields, firstNormalizerFactory, comparatorFactories, mergerFactory); mergeFactory--> intermediateAggregateFactory https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java: Line 46: class GroupingHashTable { Should we remove that one just in case it makes people confusing. It is an object-based implementation which shouldn't ever be used. https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java: Line 38: public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor { remove this one? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java: Line 83: private final INullWriter[] nullWriters1; "nullWriters1" --> "nullWriters" Line 95: private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0) remove unused variable Line 98: private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoin.class.getName()); remove unused variable Line 103: private TuplePointer tempPtr = new TuplePointer(); It seems no where in this class reads tempPtr. If that's the case, can we remove this and just pass null for insertTuples()? Line 163: if (spilledStatus.get(tid)) { it seems this should be pid, correct? Line 164: while (!bufferManager.insertTupleToSpilledPartition(pid, accessorBuild, tid, tempPtr)) { tempPtr --> null? And allow bufferManager.insertTupleToSpilledPartition to deal with the case tempPtr is null? Line 183: private int selectVictim(int pid) { Could the findVictim code be unified for group-by and join? Can we factor this logic out as a sth. like MemoryManagementPolicy, which holds a reference to the IPatitionBufferManager and contains the spilledStatus BitSet internally? Line 186: return pid; Why this needs to be different from what group-by's find victim does? This is very important according to the experiments? Line 188: int partitionToSpill = selectLargestSpilledPartition(); selectLargestSpilledPartition()-> findSpilledPartitionWithMaxMemoryUsage? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java: Line 303: ctx.setStateObject(state); do not set the state for the case that a failure happens? Line 306: } clean up temporary files in case a failure happens? -- To view, visit https://asterix-gerrit.ics.uci.edu/398 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460 Gerrit-PatchSet: 16 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Pouria Pirzadeh <[email protected]> Gerrit-Reviewer: Preston Carman <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-HasComments: Yes
