Jianfeng Jia has posted comments on this change. Change subject: Implemented the memory-bounded HashGroupby and HashJoin for BigObject ......................................................................
Patch Set 16: (58 comments) https://asterix-gerrit.ics.uci.edu/#/c/398/16/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java: Line 71: new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), > why unlimited memory? Originally, this part of sort will allocate as many as possible frames from ctx. I think it assumes all the record should be sorted at once. Thus, in the buffer-manager changes I just give it unlimited budget. https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java: Line 129: System.err.flush(); > Do not flush? Done Line 150: sb.append("ExceptionWhenDeserialzingField "); > better error message? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java: Line 28: public GeneratedRunFileReader(FileReference file, IIOManager ioManager, long size, boolean deleteAfterRead, > remove "public" since it is supposed to be only be "generated" by a file wr Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java: Line 27: public abstract class AbstractTupleBufferPointAccessor implements ITupleBufferPointAccessor { > "AbstractTupleBufferPointAccessor" -> "AbstractTuplePointerAccessor"? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java: Line 114: throw new IllegalAccessError("Please call deAllocateBuffer to return the used buffer."); > Why throw an exception here? The annotation on the IFramePool interface sa Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java: Line 30: void reset() throws HyracksDataException; > Add a description, e.g., what's the difference with close() Done Line 58: boolean insertTuple(int partition, int[] fieldEndOffsets, byte[] byteArray, int start, int size, > switch the order between byte[] byteArray and int[] fieldEndOffsets, becaus Done Line 61: boolean insertTuple(int pid, IFrameTupleAccessor accessorBuild, int tid, TuplePointer tempPtr) > Add some annotation for this method. Done Line 75: boolean insertTupleToSpilledPartition(int partition, int[] fieldEndOffsets, byte[] byteArray, int start, int size, > switch the order between byte[] byteArray and int[] fieldEndOffsets, becaus Done Line 81: void close(); > Describe the contract of this method? Done Line 83: ITupleBufferPointAccessor getTupleAccessor(RecordDescriptor recordDescriptor); > Why do you need the recordDescriptor? The ITuplePointerAccessor uses IFrameAccessor internally. In order to easily inherit the FrameAccessor I just copied the constructor parameter of the IFrameAccessor. https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java: Line 31: public interface ITupleBufferPointAccessor extends IFrameTupleAccessor { > ITupleBufferPointAccessor->IFrameTuplePointerAccessor? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java: Line 41: public class VGroupTupleBufferManager implements IPartitionedTupleBufferManager { > Why it is called "Group"? It makes me think this is sth. only used by the Done Line 109: public boolean insertTuple(int partition, int[] fieldEndOffsets, byte[] byteArray, int start, int size, > change the order of arguments? Done Line 132: public boolean insertTuple(int pid, IFrameTupleAccessor accessorBuild, int tid, TuplePointer tempPtr) > pid->partition Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java: Line 49: private static final int DEFAULT_TUPLE_PER_FRAME = 10; > Where do you get this number? From the join operator? I removed this default value and add the fileSize parameter all through the constructors of ExternalGroupby Line 50: private static double factor = 1.1; > "factor" --> "FUDGE_FACTOR" Done Line 59: public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize, long fileSize, > fileSize -> dataByteSize ? Done Line 74: final FrameTuplePairComparator ftpcInputVSAggregate = new FrameTuplePairComparator(keyFields, aggregatedKeys, > What does VSAggregate mean? I changed it to `ftpcInputCompareToAggregate` Line 81: outRecordDescriptor, keyFields, aggregatedKeys, null); > aggregatedKeys --> intermediateKeys? Done Line 95: if (LOGGER.isLoggable(Level.FINE)) { > The if-check seems not necessary. It can save the string concat in the Logging.Fine functions. I wish Java can provide some lazy initialization to avoid this extra step. Line 105: final ISerializableTable metaTable = new SerializableHashTable(tableSize, ctx); > metaTable --> hashTableForTuplePointers Done Line 112: private BitSet flushedSet = new BitSet(numPhysicalPartitions); > "flushedSet" --> "spilledSet" Done Line 132: return logicalPartition % numPhysicalPartitions; > Would that be better to let it be Done Line 139: private int getNextLogicalPartition(int curLogicalPartition) { > The term "logical partition" and "physical partition" disturbs me a bit. I Done Line 248: int flushedPartition = findMaxSizeIdInFlushedPartitions(); > "findMaxSizeIdInFlushedPartitions()" -> "findSpilledPartitionWithLargestMem Done Line 250: return findMaxSizeIdInMemPartitions(); > "findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition" Done Line 252: // if this maxSize partition is too small to flush, we need to find a better candidate > "If the victim partition only have one in-memory page, we will try to find We need to restrict at most one page for a spilled partition. Then the other partition will have less chance to spilled out the half-full frames. Finally it can avoid disk thrashing. Line 254: && flushedSet.nextClearBit(0) >= 0) { > Can you wrap "flushedSet.nextClearBit(0)>=0" as a private method with a mea Done Line 255: int max = findMaxSizeIdInMemPartitions(); > "findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition" Done Line 295: private int getNumOfPartitions(int tableSize, int fileSizeInFrames, int memSize) { > fileSizeInFrames --> numFramesForData ? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java: Line 39: * @param tupleBuilder ? > reorder the params and remove "?" Done Line 83: * @return TODO > "TODO" ->... Done Line 99: * @return TODO > "TODO" --> .... Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java: Line 28: void close() throws HyracksDataException; > Add some description for close(), clear(), and insert()? Done Line 51: * The {@code accessor} and {@code tIndex} given the reference to the tuple tobe inserted. > "tobe" --> "to be" Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java: Line 55: private ExternalHashGroupBy externalGroup; > "externalGroup" --> "externalGroupBy" Done Line 101: //do nothing for failures > It seems that we need a flag to indicate whether there is a failure in the Done Line 107: ctx.setStateObject(state); > If there is a failure, we should not set the state. Done Line 120: externalGroup = null; > In case there is a failure, we still want to cleanup the temporary run file Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java: Line 42: public class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable > "ExternalGroupMergeOperatorNodePushable" --> "ExternalGroupWriteOperatorNod Done Line 115: ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, sizeInTuples[i], > "sizeInTuples" --> "numOfTuples" ? Done Line 143: groupBy.build(frame.getBuffer()); > "build" --> "insert" ? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java: Line 145: keyFields, firstNormalizerFactory, comparatorFactories, mergerFactory); > mergeFactory--> intermediateAggregateFactory Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java: Line 46: class GroupingHashTable { > Should we remove that one just in case it makes people confusing. It is an It seems all the HashGroupBuildOperatorDescriptor related code doesn't be used anywhere in the plan, except in several test cases. Should we remove this "hash" folder? https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java: Line 38: public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor { > remove this one? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java: Line 83: private final INullWriter[] nullWriters1; > "nullWriters1" --> "nullWriters" Done Line 95: private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0) > remove unused variable Done Line 98: private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoin.class.getName()); > remove unused variable Done Line 103: private TuplePointer tempPtr = new TuplePointer(); > It seems no where in this class reads tempPtr. If that's the case, can we r It's true that nowhere is using this tempPtr. But I think to add the null ptr handle logic inside the insertTuple() is a little overkill to solve this no harm scenario. I added some comments to indicate the purpose of this Ptr. Line 163: if (spilledStatus.get(tid)) { > it seems this should be pid, correct? Done Line 164: while (!bufferManager.insertTupleToSpilledPartition(pid, accessorBuild, tid, tempPtr)) { > tempPtr --> null? And allow bufferManager.insertTupleToSpilledPartition to I think the tempPtr is no harm. But adding the null logic in insert method will introduce overhead. Line 183: private int selectVictim(int pid) { > Could the findVictim code be unified for group-by and join? Done by adding AtMostOneFrameForSpilledPartitionPolicy Line 186: return pid; > Why this needs to be different from what group-by's find victim does? The policy is same. I unified those two parts by a policy class. Line 188: int partitionToSpill = selectLargestSpilledPartition(); > selectLargestSpilledPartition()-> findSpilledPartitionWithMaxMemoryUsage? Done https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java: Line 303: ctx.setStateObject(state); > do not set the state for the case that a failure happens? Done Line 306: } > clean up temporary files in case a failure happens? Done -- To view, visit https://asterix-gerrit.ics.uci.edu/398 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460 Gerrit-PatchSet: 16 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Pouria Pirzadeh <[email protected]> Gerrit-Reviewer: Preston Carman <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-HasComments: Yes
