Yingyi Bu has posted comments on this change.

Change subject: Add flush() to IFrameWriter
......................................................................


Patch Set 4:

(16 comments)

Mostly LGTM.

Just two comments:
1. Let IFrameWriter provide a default implementation for flush(), which throws 
an exception.  JDK-8 allows default implementations in interfaces. Therefore we 
can avoid putting empty implementations everywhere.  Also, those empty 
implementation might make debugging harder later, e.g., why a tuple is not 
flushed to the end. Thus, I prefer to throw an exception because we never want 
the flush() to be called at those places.

2.  Let FrameTupleAppender to deals with the flush of writer.  Currently 
TupleAppender.flush() means writes whatever left to the writer by calling its 
nextFrame().  I suggest to rename the current TupleAppender.flush() to 
TupleAppender.write(), and then add a new TupleAppender.flush() call to call 
TupleAppender.write() and writer.flush().  In this way, we don't need to call 
tupleAppender.flush() and then writer.flush() everywhere.

https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java:

Line 247:             outputWriter.flush();
let appender deals with writer.flush()


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java:

Line 178:                 writer.flush();
let appender deals with writer.flush()


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java:

Line 65:                 // This operator produces a single empty tuple and 
flush() doesn't make sense here
This is the pipeline starter, therefore writer.flush() might be wanted.


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java:

Line 74:             // This operator is a one frame push runtime and flush() 
doesn't make sense here
this is the nested pipeline starter, therefore writer.flush() seems needed.


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java:

Line 117:                 for (int i = 0; i < outputArity; i++) {
The name tupleAppender.flush() sounds confusing -- I thought i will also flush 
the writer.

Rename the current tupleAppender.flush() to tupleAppender.write() and
add a new tupleAppender.flush(), which means tupleAppender.write() + 
writer.flush()?


Line 120:                     writers[i].flush();
let tupleAppender.flush(...) internally call writers[i].flush()?


Line 183:                 tupleAppender.reset(writeBuffers[outputIndex], false);
Why this additional reset is needed?


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java:

Line 168:                 writer.flush();
let appender.flush() deals with flush the writer?


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java:

Line 153:                 writer.flush();
let appender deals with writer.flush()


https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
File 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java:

Line 166:                 writer.flush();
let appender deals with writer.flush()


https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
File 
hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java:

Line 68:     public void flush() throws HyracksDataException;
provide a default implementation for that to throw an exception, therefore we 
can avoid to put empty implementations everywhere.


https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
File 
hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java:

Line 125:         // This frame writer always pushes its content
Flush cannot cross the network boundary?


https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
File 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java:

Line 202:         // materialize operators should only send their output once 
all of their input has been consumed. hence, this is a no op
This is materialized pipelined writer, which materializes the data and at the 
same time writes the data to the pipeline.


https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
File 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java:

Line 96:         frameWriter.flush();
move writer.flush() into appender.flush()


https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java:

Line 158:             appenders[i].flush(pWriters[i], true);
move writer.flush() into appender.flush()


https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
File 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java:

Line 151:             pWriters[i].flush();
let appender.flush() deals with writer.flush()


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/584
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85424bab7965b71aac709280af066e1655457aa3
Gerrit-PatchSet: 4
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>
Gerrit-HasComments: Yes

Reply via email to