Yingyi Bu has posted comments on this change.

Change subject: Implemented the memory-bounded HashGroupby and HashJoin for 
BigObject
......................................................................


Patch Set 16:

(59 comments)

https://asterix-gerrit.ics.uci.edu/#/c/398/16/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java:

Line 71:                             new VariableFramePool(ctx, 
VariableFramePool.UNLIMITED_MEMORY),
why unlimited memory?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
File 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java:

Line 129:         System.err.flush();
Do not flush?


Line 150:                 sb.append("ExceptionWhenDeserialzingField ");
better error message?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java
File 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/GeneratedRunFileReader.java:

Line 28:     public GeneratedRunFileReader(FileReference file, IIOManager 
ioManager, long size, boolean deleteAfterRead,
remove "public" since it is supposed to be only be "generated" by a file writer.


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleBufferPointAccessor.java:

Line 27: public abstract class AbstractTupleBufferPointAccessor implements 
ITupleBufferPointAccessor {
"AbstractTupleBufferPointAccessor" -> "AbstractTuplePointerAccessor"?
"ITupleBufferPointAccessor" -> "ITuplePointerAccessor"?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java:

Line 114:         throw new IllegalAccessError("Please call deAllocateBuffer to 
return the used buffer.");
Why throw an exception here?  The annotation on the IFramePool interface says 
that "Reset the counters to initial status"


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java:

Line 30:     void reset() throws HyracksDataException;
Add a description, e.g., what's the difference with close()


Line 58:     boolean insertTuple(int partition, int[] fieldEndOffsets, byte[] 
byteArray, int start, int size,
switch the order between byte[] byteArray and int[] fieldEndOffsets, because 
fieldEndOffsets essentially is a descriptor of one tuple, and should better be 
together with start and size.


Line 61:     boolean insertTuple(int pid, IFrameTupleAccessor accessorBuild, 
int tid, TuplePointer tempPtr)
Add some annotation for this method.
Maybe, unify the parameter names, e.g.,
pid->partition
tempPtr -> pointer

Also,
accessorBuild -> tupleAccessor


Line 75:     boolean insertTupleToSpilledPartition(int partition, int[] 
fieldEndOffsets, byte[] byteArray, int start, int size,
switch the order between byte[] byteArray and int[] fieldEndOffsets, because 
fieldEndOffsets essentially is a descriptor of one tuple, and should better be 
together with start and size.


Line 81:     void close();
Describe the contract of this method?


Line 83:     ITupleBufferPointAccessor getTupleAccessor(RecordDescriptor 
recordDescriptor);
Why do you need the recordDescriptor?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferPointAccessor.java:

Line 31: public interface ITupleBufferPointAccessor extends IFrameTupleAccessor 
{
ITupleBufferPointAccessor->IFrameTuplePointerAccessor?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VGroupTupleBufferManager.java:

Line 41: public class VGroupTupleBufferManager implements 
IPartitionedTupleBufferManager {
Why it is called "Group"?  It makes me think this is sth. only used by the 
group-operator but it is also used by the join operator.
Maybe "Group" -> "Partition"?


Line 109:     public boolean insertTuple(int partition, int[] fieldEndOffsets, 
byte[] byteArray, int start, int size,
change the order of arguments?


Line 132:     public boolean insertTuple(int pid, IFrameTupleAccessor 
accessorBuild, int tid, TuplePointer tempPtr)
pid->partition

accessorBuild -> tupleAccessor

tempPtr -> pointer


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java:

Line 49:     private static final int DEFAULT_TUPLE_PER_FRAME = 10;
Where do you get this number?  From the join operator?


Line 50:     private static double factor = 1.1;
"factor" --> "FUDGE_FACTOR"
"static" --> "static final"


Line 59:     public ISpillableTable buildSpillableTable(final 
IHyracksTaskContext ctx, int suggestTableSize, long fileSize,
fileSize -> dataByteSize ?


Line 74:         final FrameTuplePairComparator ftpcInputVSAggregate = new 
FrameTuplePairComparator(keyFields, aggregatedKeys,
What does VSAggregate mean?


Line 81:                 outRecordDescriptor, keyFields, aggregatedKeys, null);
aggregatedKeys --> intermediateKeys?


Line 95:         if (LOGGER.isLoggable(Level.FINE)) {
The if-check seems not necessary.


Line 105:         final ISerializableTable metaTable = new 
SerializableHashTable(tableSize, ctx);
metaTable --> hashTableForTuplePointers


Line 112:             private BitSet flushedSet = new 
BitSet(numPhysicalPartitions);
"flushedSet" --> "spilledSet"


Line 132:                 return logicalPartition % numPhysicalPartitions;
Would that be better to let it be
logicalPartition / numPhysicalPartitions?

Therefore pointers for the same partition are physically aligned together.
Then, iterating over a partition potentially could be more efficient.


Line 139:             private int getNextLogicalPartition(int 
curLogicalPartition) {
The term "logical partition" and "physical partition" disturbs me a bit.  I 
spend some effort to figure out what they actually are.

It looks they actually have standard terminologies:

1. "logical partition" -> "entryInHashTable"

2. "physical partition" --> "partition"


Line 248:                 int flushedPartition = 
findMaxSizeIdInFlushedPartitions();
"findMaxSizeIdInFlushedPartitions()" -> 
"findSpilledPartitionWithLargestMemoryUsage"?


Line 250:                     return findMaxSizeIdInMemPartitions();
"findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition"


Line 252:                 // if this maxSize partition is too small to flush, 
we need to find a better candidate
Maybe rephrase the description:

"If the victim partition only have one in-memory partition, we need to find 
another victim.  We need to maintain at least one page for a spilled partition 
to avoid disk thrashing."

(Maybe my understanding is wrong.)


Line 254:                         && flushedSet.nextClearBit(0) >= 0) {
Can you wrap "flushedSet.nextClearBit(0)>=0" as a private method with a 
meaningful name, e.g., existsInMemoryPartitions()?   This is the only place in 
findVictimPartition that refers to flushedSet.  It would be nice to factor out 
the details of flushetSet.


Line 255:                     int max = findMaxSizeIdInMemPartitions();
"findMaxSizeIdInMemPartitions" --> "findLargestInMemPartition"


Line 295:     private int getNumOfPartitions(int tableSize, int 
fileSizeInFrames, int memSize) {
fileSizeInFrames --> numFramesForData ?
memSize --> frameLimit ?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptor.java:

Line 39:      * @param tupleBuilder ?
reorder the params and remove "?"


Line 83:      * @return TODO
"TODO" ->...


Line 99:      * @return TODO
"TODO" --> ....


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTable.java:

Line 28:     void close() throws HyracksDataException;
Add some description for close(), clear(), and insert()?


Line 51:      * The {@code accessor} and {@code tIndex} given the reference to 
the tuple tobe inserted.
"tobe" --> "to be"


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java:

Line 55:     private ExternalHashGroupBy externalGroup;
"externalGroup" --> "externalGroupBy"


Line 101:         //do nothing for failures
It seems that we need a flag to indicate whether there is a failure in the data 
producer of this operator.


Line 107:         ctx.setStateObject(state);
If there is a failure, we should not set the state.


Line 120:         externalGroup = null;
In case there is a failure, we still want to cleanup the temporary run files.


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java:

Line 42: public class ExternalGroupMergeOperatorNodePushable extends 
AbstractUnaryOutputSourceOperatorNodePushable
"ExternalGroupMergeOperatorNodePushable" --> 
"ExternalGroupWriteOperatorNodePushable"

Because we don't do "merge" here. 
"Write" might be the right term since it writes out all group-by states.


Line 96:             writer.fail();
In case there is a failure in its data consumer, we should still cleanup 
leftover run files if any.


Line 115:                 ISpillableTable partitionTable = 
spillableTableFactory.buildSpillableTable(ctx, sizeInTuples[i],
"sizeInTuples" --> "numOfTuples" ?


Line 143:                 groupBy.build(frame.getBuffer());
"build" --> "insert" ?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java:

Line 145:                     keyFields, firstNormalizerFactory, 
comparatorFactories, mergerFactory);
mergeFactory--> intermediateAggregateFactory


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java:

Line 46: class GroupingHashTable {
Should we remove that one just in case it makes people confusing. It is an 
object-based implementation which shouldn't ever be used.


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java:

Line 38: public class HashGroupOperatorDescriptor extends 
AbstractOperatorDescriptor {
remove this one?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java:

Line 83:     private final INullWriter[] nullWriters1;
"nullWriters1" --> "nullWriters"


Line 95:     private boolean isTableEmpty; //Added for handling the case, where 
build side is empty (tableSize is 0)
remove unused variable


Line 98:     private static final Logger LOGGER = 
Logger.getLogger(OptimizedHybridHashJoin.class.getName());
remove unused variable


Line 103:     private TuplePointer tempPtr = new TuplePointer();
It seems no where in this class reads tempPtr. If that's the case, can we 
remove this and just pass null for insertTuples()?


Line 163:         if (spilledStatus.get(tid)) {
it seems this should be pid, correct?


Line 164:             while (!bufferManager.insertTupleToSpilledPartition(pid, 
accessorBuild, tid, tempPtr)) {
tempPtr --> null?  And allow bufferManager.insertTupleToSpilledPartition to 
deal with the case tempPtr is null?


Line 183:     private int selectVictim(int pid) {
Could the findVictim code be unified for group-by and join?
Can we factor this logic out as a sth. like MemoryManagementPolicy, which holds 
a reference to the IPatitionBufferManager and contains the spilledStatus BitSet 
internally?


Line 186:             return pid;
Why this needs to be different from what group-by's find victim does?
This is very important according to the experiments?


Line 188:         int partitionToSpill = selectLargestSpilledPartition();
selectLargestSpilledPartition()-> findSpilledPartitionWithMaxMemoryUsage?


https://asterix-gerrit.ics.uci.edu/#/c/398/16/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java:

Line 303:                     ctx.setStateObject(state);
do not set the state for the case that a failure happens?


Line 306:                     }
clean up temporary files in case a failure happens?


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/398
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460
Gerrit-PatchSet: 16
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Jianfeng Jia <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Jianfeng Jia <[email protected]>
Gerrit-Reviewer: Pouria Pirzadeh <[email protected]>
Gerrit-Reviewer: Preston Carman <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>
Gerrit-HasComments: Yes

Reply via email to