Yingyi Bu has posted comments on this change. Change subject: Add flush() to IFrameWriter ......................................................................
Patch Set 4: (16 comments) Mostly LGTM. Just two comments: 1. Let IFrameWriter provide a default implementation for flush(), which throws an exception. JDK-8 allows default implementations in interfaces. Therefore we can avoid putting empty implementations everywhere. Also, those empty implementation might make debugging harder later, e.g., why a tuple is not flushed to the end. Thus, I prefer to throw an exception because we never want the flush() to be called at those places. 2. Let FrameTupleAppender to deals with the flush of writer. Currently TupleAppender.flush() means writes whatever left to the writer by calling its nextFrame(). I suggest to rename the current TupleAppender.flush() to TupleAppender.write(), and then add a new TupleAppender.flush() call to call TupleAppender.write() and writer.flush(). In this way, we don't need to call tupleAppender.flush() and then writer.flush() everywhere. https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java: Line 247: outputWriter.flush(); let appender deals with writer.flush() https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java: Line 178: writer.flush(); let appender deals with writer.flush() https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java: Line 65: // This operator produces a single empty tuple and flush() doesn't make sense here This is the pipeline starter, therefore writer.flush() might be wanted. https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java: Line 74: // This operator is a one frame push runtime and flush() doesn't make sense here this is the nested pipeline starter, therefore writer.flush() seems needed. https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java: Line 117: for (int i = 0; i < outputArity; i++) { The name tupleAppender.flush() sounds confusing -- I thought i will also flush the writer. Rename the current tupleAppender.flush() to tupleAppender.write() and add a new tupleAppender.flush(), which means tupleAppender.write() + writer.flush()? Line 120: writers[i].flush(); let tupleAppender.flush(...) internally call writers[i].flush()? Line 183: tupleAppender.reset(writeBuffers[outputIndex], false); Why this additional reset is needed? https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java: Line 168: writer.flush(); let appender.flush() deals with flush the writer? https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java: Line 153: writer.flush(); let appender deals with writer.flush() https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java: Line 166: writer.flush(); let appender deals with writer.flush() https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java File hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java: Line 68: public void flush() throws HyracksDataException; provide a default implementation for that to throw an exception, therefore we can avoid to put empty implementations everywhere. https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java File hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java: Line 125: // This frame writer always pushes its content Flush cannot cross the network boundary? https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java File hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java: Line 202: // materialize operators should only send their output once all of their input has been consumed. hence, this is a no op This is materialized pipelined writer, which materializes the data and at the same time writes the data to the pipeline. https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java: Line 96: frameWriter.flush(); move writer.flush() into appender.flush() https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java: Line 158: appenders[i].flush(pWriters[i], true); move writer.flush() into appender.flush() https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java: Line 151: pWriters[i].flush(); let appender.flush() deals with writer.flush() -- To view, visit https://asterix-gerrit.ics.uci.edu/584 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I85424bab7965b71aac709280af066e1655457aa3 Gerrit-PatchSet: 4 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]> Gerrit-HasComments: Yes
