abdullah alamoudi has posted comments on this change. Change subject: Add flush() to IFrameWriter ......................................................................
Patch Set 4: (16 comments) addressed all of the comments. I kind of want to be careful with throwing exceptions on the flush() call since there are cases where the flush() is not supposed to do anything but is expected to be called. For example, sink operator are not expected to flush() anything but the previous operator might not be aware that it is a sink operator and we don't want each operator to validate the next operator and make sure it can flush its content. https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java: Line 247: outputWriter.flush(); > let appender deals with writer.flush() Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java: Line 178: writer.flush(); > let appender deals with writer.flush() Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java: Line 65: // This operator produces a single empty tuple and flush() doesn't make sense here > This is the pipeline starter, therefore writer.flush() might be wanted. Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java: Line 74: // This operator is a one frame push runtime and flush() doesn't make sense here > this is the nested pipeline starter, therefore writer.flush() seems needed. Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java: Line 117: for (int i = 0; i < outputArity; i++) { > The name tupleAppender.flush() sounds confusing -- I thought i will also fl Done Line 120: writers[i].flush(); > let tupleAppender.flush(...) internally call writers[i].flush()? Done Line 183: tupleAppender.reset(writeBuffers[outputIndex], false); > Why this additional reset is needed? the reset is needed because in the implementation of the FrameUtils.appendToWriter, the FrameAppender will first try to insert the tuple to the frame associated with it if there was enough space. that frame might be the frame of another writer. hence, we need to reset the appender with the frame of the output writer before we call FramUtils.appendToWriter. https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java: Line 168: writer.flush(); > let appender.flush() deals with flush the writer? Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java: Line 153: writer.flush(); > let appender deals with writer.flush() Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java File algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java: Line 166: writer.flush(); > let appender deals with writer.flush() Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java File hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java: Line 68: public void flush() throws HyracksDataException; > provide a default implementation for that to throw an exception, therefore Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java File hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java: Line 125: // This frame writer always pushes its content > Flush cannot cross the network boundary? So, here is how it works. a flush() is a task operation. since there is no way to move the flush through the network without sending some data. So what happens after the network? the frame receiver on the network boundary will flush its content in turns when its buffer become empty. Otherwise, we force the input channel reader to inspect frames or use RMI to flush frames and it creates a bit of overhead. It is kind of similar to the fail() in the sense that it doesn't cross the boundaries but you call it on each task separately. https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java File hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java: Line 202: // materialize operators should only send their output once all of their input has been consumed. hence, this is a no op > This is materialized pipelined writer, which materializes the data and at t Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java: Line 96: frameWriter.flush(); > move writer.flush() into appender.flush() Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java: Line 158: appenders[i].flush(pWriters[i], true); > move writer.flush() into appender.flush() Done https://asterix-gerrit.ics.uci.edu/#/c/584/4/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java: Line 151: pWriters[i].flush(); > let appender.flush() deals with writer.flush() Done -- To view, visit https://asterix-gerrit.ics.uci.edu/584 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I85424bab7965b71aac709280af066e1655457aa3 Gerrit-PatchSet: 4 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]> Gerrit-HasComments: Yes
