Jianfeng Jia has posted comments on this change.

Change subject: Implemented the memory-bounded HashGroupby and HashJoin for 
BigObject
......................................................................


Patch Set 16:

(58 comments)

https://asterix-gerrit.ics.uci.edu/#/c/398/16/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java:

Line 71:                             new VariableFramePool(ctx, 
VariableFramePool.UNLIMITED_MEMORY),
> why unlimited memory?
Originally, this part of sort will allocate as many as possible frames from 
ctx. I think it assumes all the record should be sorted at once. Thus, in the 
buffer-manager changes I just give it unlimited budget.


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
File 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java:

Line 129:         System.err.flush();
> Do not flush?
Done


Line 150:                 sb.append("ExceptionWhenDeserialzingField ");
> better error message?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java
File 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java:

Line 28:     public GeneratedRunFileReader(FileReference file, IIOManager 
ioManager, long size, boolean deleteAfterRead,
> remove "public" since it is supposed to be only be "generated" by a file wr
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java:

Line 27: public abstract class AbstractTupleBufferPointAccessor implements 
ITupleBufferPointAccessor {
> "AbstractTupleBufferPointAccessor" -> "AbstractTuplePointerAccessor"?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java:

Line 114:         throw new IllegalAccessError("Please call deAllocateBuffer to 
return the used buffer.");
> Why throw an exception here?  The annotation on the IFramePool interface sa
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java:

Line 30:     void reset() throws HyracksDataException;
> Add a description, e.g., what's the difference with close()
Done


Line 58:     boolean insertTuple(int partition, int[] fieldEndOffsets, byte[] 
byteArray, int start, int size,
> switch the order between byte[] byteArray and int[] fieldEndOffsets, becaus
Done


Line 61:     boolean insertTuple(int pid, IFrameTupleAccessor accessorBuild, 
int tid, TuplePointer tempPtr)
> Add some annotation for this method.
Done


Line 75:     boolean insertTupleToSpilledPartition(int partition, int[] 
fieldEndOffsets, byte[] byteArray, int start, int size,
> switch the order between byte[] byteArray and int[] fieldEndOffsets, becaus
Done


Line 81:     void close();
> Describe the contract of this method?
Done


Line 83:     ITupleBufferPointAccessor getTupleAccessor(RecordDescriptor 
recordDescriptor);
> Why do you need the recordDescriptor?
The ITuplePointerAccessor uses IFrameAccessor internally. In order to easily 
inherit the FrameAccessor I just copied the constructor parameter of the 
IFrameAccessor.


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java:

Line 31: public interface ITupleBufferPointAccessor extends IFrameTupleAccessor 
{
> ITupleBufferPointAccessor->IFrameTuplePointerAccessor?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java:

Line 41: public class VGroupTupleBufferManager implements 
IPartitionedTupleBufferManager {
> Why it is called "Group"?  It makes me think this is sth. only used by the 
Done


Line 109:     public boolean insertTuple(int partition, int[] fieldEndOffsets, 
byte[] byteArray, int start, int size,
> change the order of arguments?
Done


Line 132:     public boolean insertTuple(int pid, IFrameTupleAccessor 
accessorBuild, int tid, TuplePointer tempPtr)
> pid->partition
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java:

Line 49:     private static final int DEFAULT_TUPLE_PER_FRAME = 10;
> Where do you get this number?  From the join operator?
I removed this default value and add the fileSize parameter all through the 
constructors of ExternalGroupby


Line 50:     private static double factor = 1.1;
> "factor" --> "FUDGE_FACTOR"
Done


Line 59:     public ISpillableTable buildSpillableTable(final 
IHyracksTaskContext ctx, int suggestTableSize, long fileSize,
> fileSize -> dataByteSize ?
Done


Line 74:         final FrameTuplePairComparator ftpcInputVSAggregate = new 
FrameTuplePairComparator(keyFields, aggregatedKeys,
> What does VSAggregate mean?
I changed it to `ftpcInputCompareToAggregate`


Line 81:                 outRecordDescriptor, keyFields, aggregatedKeys, null);
> aggregatedKeys --> intermediateKeys?
Done


Line 95:         if (LOGGER.isLoggable(Level.FINE)) {
> The if-check seems not necessary.
It can save the string concat in the Logging.Fine functions.
I wish Java can provide some lazy initialization to avoid this extra step.


Line 105:         final ISerializableTable metaTable = new 
SerializableHashTable(tableSize, ctx);
> metaTable --> hashTableForTuplePointers
Done


Line 112:             private BitSet flushedSet = new 
BitSet(numPhysicalPartitions);
> "flushedSet" --> "spilledSet"
Done


Line 132:                 return logicalPartition % numPhysicalPartitions;
> Would that be better to let it be
Done


Line 139:             private int getNextLogicalPartition(int 
curLogicalPartition) {
> The term "logical partition" and "physical partition" disturbs me a bit.  I
Done


Line 248:                 int flushedPartition = 
findMaxSizeIdInFlushedPartitions();
> "findMaxSizeIdInFlushedPartitions()" -> "findSpilledPartitionWithLargestMem
Done


Line 250:                     return findMaxSizeIdInMemPartitions();
> "findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition"
Done


Line 252:                 // if this maxSize partition is too small to flush, 
we need to find a better candidate
> "If the victim partition only have one in-memory page, we will try to find 
We need to restrict at most one page for a spilled partition. Then the other 
partition will have less chance to spilled out the half-full frames. Finally it 
can avoid disk thrashing.


Line 254:                         && flushedSet.nextClearBit(0) >= 0) {
> Can you wrap "flushedSet.nextClearBit(0)>=0" as a private method with a mea
Done


Line 255:                     int max = findMaxSizeIdInMemPartitions();
> "findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition"
Done


Line 295:     private int getNumOfPartitions(int tableSize, int 
fileSizeInFrames, int memSize) {
> fileSizeInFrames --> numFramesForData ?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java:

Line 39:      * @param tupleBuilder ?
> reorder the params and remove "?"
Done


Line 83:      * @return TODO
> "TODO" ->...
Done


Line 99:      * @return TODO
> "TODO" --> ....
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java:

Line 28:     void close() throws HyracksDataException;
> Add some description for close(), clear(), and insert()?
Done


Line 51:      * The {@code accessor} and {@code tIndex} given the reference to 
the tuple tobe inserted.
> "tobe" --> "to be"
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java:

Line 55:     private ExternalHashGroupBy externalGroup;
> "externalGroup" --> "externalGroupBy"
Done


Line 101:         //do nothing for failures
> It seems that we need a flag to indicate whether there is a failure in the 
Done


Line 107:         ctx.setStateObject(state);
> If there is a failure, we should not set the state.
Done


Line 120:         externalGroup = null;
> In case there is a failure, we still want to cleanup the temporary run file
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java:

Line 42: public class ExternalGroupMergeOperatorNodePushable extends 
AbstractUnaryOutputSourceOperatorNodePushable
> "ExternalGroupMergeOperatorNodePushable" --> "ExternalGroupWriteOperatorNod
Done


Line 115:                 ISpillableTable partitionTable = 
spillableTableFactory.buildSpillableTable(ctx, sizeInTuples[i],
> "sizeInTuples" --> "numOfTuples" ?
Done


Line 143:                 groupBy.build(frame.getBuffer());
> "build" --> "insert" ?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java:

Line 145:                     keyFields, firstNormalizerFactory, 
comparatorFactories, mergerFactory);
> mergeFactory--> intermediateAggregateFactory
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java:

Line 46: class GroupingHashTable {
> Should we remove that one just in case it makes people confusing. It is an 
It seems all the HashGroupBuildOperatorDescriptor related code doesn't be used 
anywhere in the plan, except in several test cases. Should we remove this 
"hash" folder?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java:

Line 38: public class HashGroupOperatorDescriptor extends 
AbstractOperatorDescriptor {
> remove this one?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java:

Line 83:     private final INullWriter[] nullWriters1;
> "nullWriters1" --> "nullWriters"
Done


Line 95:     private boolean isTableEmpty; //Added for handling the case, where 
build side is empty (tableSize is 0)
> remove unused variable
Done


Line 98:     private static final Logger LOGGER = 
Logger.getLogger(OptimizedHybridHashJoin.class.getName());
> remove unused variable
Done


Line 103:     private TuplePointer tempPtr = new TuplePointer();
> It seems no where in this class reads tempPtr. If that's the case, can we r
It's true that nowhere is using this tempPtr. But I think to add the null ptr 
handle logic inside the insertTuple() is a little overkill to solve this no 
harm scenario. I added some comments to indicate the purpose of this Ptr.


Line 163:         if (spilledStatus.get(tid)) {
> it seems this should be pid, correct?
Done


Line 164:             while (!bufferManager.insertTupleToSpilledPartition(pid, 
accessorBuild, tid, tempPtr)) {
> tempPtr --> null?  And allow bufferManager.insertTupleToSpilledPartition to
I think the tempPtr is no harm. But adding the null logic in insert method will 
introduce overhead.


Line 183:     private int selectVictim(int pid) {
> Could the findVictim code be unified for group-by and join?
Done by adding AtMostOneFrameForSpilledPartitionPolicy


Line 186:             return pid;
> Why this needs to be different from what group-by's find victim does?
The policy is same. I unified those two parts by a policy class.


Line 188:         int partitionToSpill = selectLargestSpilledPartition();
> selectLargestSpilledPartition()-> findSpilledPartitionWithMaxMemoryUsage?
Done


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java:

Line 303:                     ctx.setStateObject(state);
> do not set the state for the case that a failure happens?
Done


Line 306:                     }
> clean up temporary files in case a failure happens?
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/398
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460
Gerrit-PatchSet: 16
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Jianfeng Jia <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Jianfeng Jia <[email protected]>
Gerrit-Reviewer: Pouria Pirzadeh <[email protected]>
Gerrit-Reviewer: Preston Carman <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>
Gerrit-HasComments: Yes

Reply via email to