abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/584

Change subject: Add flush() to IFrameWriter
......................................................................

Add flush() to IFrameWriter

This change introduces flush() method in frame writers. It is
intended to be used for long running jobs (Feeds) to flush contents
of frames all the way to storage.

Change-Id: I85424bab7965b71aac709280af066e1655457aa3
---
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M 
hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
M 
hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
M 
hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
M 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
M 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
M 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
M 
hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
M 
hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
M 
hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
M 
hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
M 
hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
M 
hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
M 
hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
66 files changed, 455 insertions(+), 128 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/84/584/1

diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index bafe8a7..d5e9313 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -142,6 +142,10 @@
                     writer.fail();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index fefc72e..3b7b321 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -54,8 +54,7 @@
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] 
partialKeys) throws HyracksDataException {
 
-        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length,
-                decorFieldIdx.length);
+        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new 
NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
             try {
@@ -132,8 +131,8 @@
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) throws 
HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be 
called");
             }
 
@@ -231,6 +230,10 @@
         @Override
         public void fail() throws HyracksDataException {
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 2ca933a..a6d1e19 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -127,8 +127,8 @@
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) throws 
HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be 
called");
             }
 
@@ -219,8 +219,8 @@
                 for (int f = 0; f < w; f++) {
                     tb.addField(accessor, tIndex, f);
                 }
-                FrameUtils.appendToWriter(outputWriter, outputAppender, 
tb.getFieldEndOffsets(),
-                        tb.getByteArray(), 0, tb.getSize());
+                FrameUtils.appendToWriter(outputWriter, outputAppender, 
tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                        tb.getSize());
             }
         }
 
@@ -241,6 +241,10 @@
         public void fail() throws HyracksDataException {
         }
 
+        @Override
+        public void flush() throws HyracksDataException {
+        }
+
     }
 
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index 69876b0..603c5b3 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -57,6 +57,10 @@
             @Override
             public void close() throws HyracksDataException {
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index b7a73d4..2368a9c 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -29,7 +29,6 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
@@ -90,6 +89,11 @@
             public void close() throws HyracksDataException {
                 pgw.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                pgw.flush();
+            }
         };
 
     }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 8322cdc..f2afe98 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -157,6 +157,11 @@
             public void fail() throws HyracksDataException {
                 startOfPipeline.fail();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                startOfPipeline.flush();
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 25eb229..da70f4f 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -137,6 +137,10 @@
                     }
                 }
 
+                @Override
+                public void flush() throws HyracksDataException {
+                }
+
             }
 
             IFrameWriter endPipe = new TupleOuterProduct();
@@ -166,6 +170,9 @@
                 }
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index bca301f..cf9e358 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -93,6 +93,10 @@
                     writer.close();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 79d9b7d..fb29af5 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -171,6 +171,14 @@
                     writer.fail();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                if (appender.getTupleCount() > 0) {
+                    appender.flush(writer, true);
+                }
+                writer.flush();
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index a2b9652..dc34725 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -59,6 +59,10 @@
             public void close() throws HyracksDataException {
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5fcb9ef..23ad31a 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -69,8 +69,8 @@
             writer.fail();
         }
 
-        public void forceFlush() throws HyracksDataException {
-            appender.flush(writer, true);
+        @Override
+        public void flush() throws HyracksDataException {
         }
     }
 }
\ No newline at end of file
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index 3d1eb06..cd382f8 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -89,7 +89,6 @@
                     if (isOpen[i]) {
                         try {
                             tupleAppender.reset(writeBuffers[i], false);
-                            // ? by JF why didn't clear the buffer ?
                             tupleAppender.flush(writers[i], false);
                         } catch (Throwable th) {
                             if (hde == null) {
@@ -110,6 +109,15 @@
                 }
                 if (hde != null) {
                     throw hde;
+                }
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                for (int i = 0; i < outputArity; i++) {
+                    tupleAppender.reset(writeBuffers[i], false);
+                    tupleAppender.flush(writers[i], true);
+                    writers[i].flush();
                 }
             }
 
@@ -172,6 +180,7 @@
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }
+                tupleAppender.reset(writeBuffers[outputIndex], false);
                 FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, 
tupleBuilder.getFieldEndOffsets(),
                         tupleBuilder.getByteArray(), 0, 
tupleBuilder.getSize());
             }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 8a5f38c..a66a06f 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -160,6 +160,14 @@
                 }
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                if (appender.getTupleCount() > 0) {
+                    appender.flush(writer, true);
+                }
+                writer.flush();
+            }
+
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index c069e4c..639fabc 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -95,4 +95,8 @@
     @Override
     public void fail() throws HyracksDataException {
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index ef172c7..dd47ebe 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -147,6 +147,14 @@
                 return lim;
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                if (appender.getTupleCount() > 0) {
+                    appender.flush(writer, true);
+                }
+                writer.flush();
+            }
+
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 2cea90d..bf089b0 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -84,6 +84,14 @@
 
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                if (appender.getTupleCount() > 0) {
+                    appender.flush(writer, true);
+                }
+                writer.flush();
+            }
+
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 75c3d08..30022ed 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -157,6 +157,14 @@
                     }
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                if (appender.getTupleCount() > 0) {
+                    appender.flush(writer, true);
+                }
+                writer.flush();
+            }
         };
     }
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 395d321..2469784 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -188,6 +188,10 @@
                 // close the following operator in the chain
                 super.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 6b21cda..f3a95cf 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -159,6 +159,14 @@
                     }
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                if (appender.getTupleCount() > 0) {
+                    appender.flush(writer, true);
+                }
+                writer.flush();
+            }
         };
     }
 
diff --git 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index f6c3ad0..222aed2 100644
--- 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -39,14 +39,13 @@
  * <ul>
  * <li>{@link IFrameWriter#close()} to give up any resources owned by the 
{@link IFrameWriter} and enter the CLOSED state.</li>
  * <li>{@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the 
{@link IFrameWriter}. The call returns normally on success and the {@link 
IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link 
HyracksDataException}, the {@link IFrameWriter} remains in the OPENED 
state.</li>
+ * <li>{@link IFrameWriter#flush()} to push tuples that are ready in the 
output frame. The {@link IFrameWriter} remains in the OPENED state.</li>
  * <li>{@link IFrameWriter#fail()} to indicate that stream is to be aborted. 
The {@link IFrameWriter} enters the FAILED state.</li>
  * </ul>
  * In the FAILED state, the only call allowed is the {@link 
IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
  * state and give up all resources.
  * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
  * Note: If the call to {@link IFrameWriter#open()} failed, the {@link 
IFrameWriter#close()} must still be called by the producer.
- *
- * @author vinayakb
  */
 public interface IFrameWriter {
     /**
@@ -56,7 +55,6 @@
 
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
-     *
      * @param buffer
      *            - Buffer containing data.
      * @throws HyracksDataException
@@ -64,16 +62,20 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
 
     /**
+     * request the frame to push its content forward and flush its consumers
+     * @throws HyracksDataException
+     */
+    public void flush() throws HyracksDataException;
+
+    /**
      * Indicate that a failure was encountered and the current stream is to be
      * aborted.
-     *
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
      * Close this {@link IFrameWriter} and give up all resources.
-     *
      * @throws HyracksDataException
      */
     public void close() throws HyracksDataException;
diff --git 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
index a5d3d4e..0b8ffc0 100644
--- 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
+++ 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
@@ -22,13 +22,10 @@
 
 /**
  * Accepts data from data producers.
- * 
- * @author vinayakb
  */
 public interface IDataWriter<T> {
     /**
      * Pushes data to the acceptor.
-     * 
      * @param data
      *            - Data pushed to the acceptor. <code>null</code> indicates 
the
      *            end of stream.
@@ -38,14 +35,18 @@
 
     /**
      * Indicates that the stream has failed.
-     * 
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
+     * Request the writer to flush its content
+     * @throws HyracksDataException
+     */
+    public void flush() throws HyracksDataException;
+
+    /**
      * Closes this writer.
-     * 
      * @throws Exception
      */
     public void close() throws HyracksDataException;
diff --git 
a/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
 
b/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index ed4279d..b5f359c 100644
--- 
a/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ 
b/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -118,4 +118,10 @@
             }
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // At the network boundary.
+        // This frame writer always pushes its content
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 9fa511c..78726b5 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -140,4 +140,8 @@
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+    }
+
 }
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index ef4c552..dda39ae 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -118,7 +118,7 @@
                 ByteBuffer destFrame = emptyQueue.poll();
                 buffer.position(0);
                 buffer.limit(buffer.capacity());
-                if (destFrame.capacity() < buffer.capacity()){
+                if (destFrame.capacity() < buffer.capacity()) {
                     throw new HyracksDataException("should never happen");
                 }
                 destFrame.clear();
@@ -137,5 +137,9 @@
         public void close() throws HyracksDataException {
             monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 291064d..2c740ab 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -103,4 +103,8 @@
 
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 07b5c53..142fd9c 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -93,8 +93,9 @@
                             MaterializingPipelinedPartition.this.wait();
                         }
                     }
-                    IFileHandle fh = fRef == null ? null : ioManager.open(fRef,
-                            IIOManager.FileReadWriteMode.READ_ONLY, 
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    IFileHandle fh = fRef == null ? null
+                            : ioManager.open(fRef, 
IIOManager.FileReadWriteMode.READ_ONLY,
+                                    
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         writer.open();
                         try {
@@ -195,4 +196,8 @@
             notifyAll();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 2ca1e0f..80bee7f 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -117,4 +117,12 @@
             delegate.close();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        ensureConnected();
+        if (!failed) {
+            delegate.flush();
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index a46fa7b..9023120 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -69,4 +69,9 @@
     public void fail() throws HyracksDataException {
         writer.fail();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index 5e543fa..54ef732 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -80,10 +80,16 @@
             @Override
             public void close() throws HyracksDataException {
                 closeTime = System.currentTimeMillis();
-                ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new 
PartitionId(ctx.getJobletContext()
-                        .getJobId(), cd.getConnectorId(), senderIndex, 
receiverIndex), openTime, closeTime, mrep));
+                ((Task) ctx)
+                        .setPartitionSendProfile(new PartitionProfile(new 
PartitionId(ctx.getJobletContext().getJobId(),
+                                cd.getConnectorId(), senderIndex, 
receiverIndex), openTime, closeTime, mrep));
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index 7d9d643..3657b7e 100644
--- 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -58,4 +58,9 @@
     @Override
     public void fail() {
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 918ebd9..477f231 100644
--- 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -88,4 +88,9 @@
     public void fail() throws HyracksDataException {
         frameWriter.fail();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        tupleAppender.flush(frameWriter, true);
+    }
 }
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 2af40fd..12ee24b 100644
--- 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -86,4 +86,8 @@
         }
         return new RunFileReader(file, ioManager, size, true);
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
 
b/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
index 05710ad..5fc692c 100644
--- 
a/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
+++ 
b/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
@@ -26,9 +26,6 @@
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameTupleAppender;
@@ -44,6 +41,8 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.junit.Before;
+import org.junit.Test;
 
 public class FrameFixedFieldTupleAppenderTest {
 
@@ -51,22 +50,19 @@
     static final int TEST_FRAME_SIZE = 256;
 
     FrameFixedFieldAppender appender;
-    static ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE,
-            new UTF8StringSerializerDeserializer(),
-            IntegerSerializerDeserializer.INSTANCE,
-            new UTF8StringSerializerDeserializer(),
-    };
+    static ISerializerDeserializer[] fields = new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), 
IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), };
     static RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
     static ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(recordDescriptor.getFieldCount());
 
-    class SequetialDataVerifier implements IFrameWriter {
+    class SequentialDataVerifier implements IFrameWriter {
 
         private final IFrameTupleAccessor accessor;
         private IFrameTupleAccessor innerAccessor;
         private int tid;
 
-        public SequetialDataVerifier(IFrameTupleAccessor accessor) {
+        public SequentialDataVerifier(IFrameTupleAccessor accessor) {
             this.accessor = accessor;
             this.innerAccessor = new FrameTupleAccessor(recordDescriptor);
         }
@@ -87,7 +83,8 @@
         private void validate(IFrameTupleAccessor innerAccessor, int i) {
             assertTrue(tid < accessor.getTupleCount());
             assertEquals(accessor.getTupleLength(tid), 
innerAccessor.getTupleLength(i));
-            assertArrayEquals(Arrays.copyOfRange(accessor.getBuffer().array(), 
accessor.getTupleStartOffset(tid),
+            assertArrayEquals(
+                    Arrays.copyOfRange(accessor.getBuffer().array(), 
accessor.getTupleStartOffset(tid),
                             accessor.getTupleEndOffset(tid)),
                     Arrays.copyOfRange(innerAccessor.getBuffer().array(), 
innerAccessor.getTupleStartOffset(i),
                             innerAccessor.getTupleEndOffset(i)));
@@ -102,6 +99,10 @@
         @Override
         public void close() throws HyracksDataException {
             assertEquals(accessor.getTupleCount(), tid);
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
         }
     }
 
@@ -143,7 +144,7 @@
     }
 
     private IFrameWriter prepareValidator(IFrameTupleAccessor accessor) throws 
HyracksDataException {
-        return new SequetialDataVerifier(accessor);
+        return new SequentialDataVerifier(accessor);
     }
 
     enum DATA_TYPE {
@@ -154,8 +155,8 @@
 
     private IFrameTupleAccessor prepareData(DATA_TYPE type) throws 
HyracksDataException {
         IFrameTupleAccessor accessor = new 
FrameTupleAccessor(recordDescriptor);
-        IFrameTupleAppender appender = new FrameTupleAppender(
-                new VSizeFrame(new FrameManager(INPUT_BUFFER_SIZE)), true);
+        IFrameTupleAppender appender = new FrameTupleAppender(new 
VSizeFrame(new FrameManager(INPUT_BUFFER_SIZE)),
+                true);
         int i = 0;
         do {
             switch (type) {
@@ -169,8 +170,8 @@
                     makeABigObjectTuple(tupleBuilder, i++);
                     break;
             }
-        } while (appender
-                .append(tupleBuilder.getFieldEndOffsets(), 
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()));
+        } while (appender.append(tupleBuilder.getFieldEndOffsets(), 
tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize()));
         accessor.reset(appender.getBuffer());
         return accessor;
     }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
index 52b4abe..1aa48e5 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
@@ -20,10 +20,16 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractUnaryInputSinkOperatorNodePushable extends 
AbstractUnaryInputOperatorNodePushable {
     @Override
     public final void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
         throw new IllegalStateException();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // Sink operator, nothing to flush
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index ee8c656..b6def39 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -151,4 +151,11 @@
             throw closeException;
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (IFrameWriter pWriter : pWriters) {
+            pWriter.flush();
+        }
+    }
 }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 7a3a019..8b0486a 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -110,6 +110,13 @@
                     epWriters[i].open();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                for (IFrameWriter writer : epWriters) {
+                    writer.flush();
+                }
+            }
         };
     }
 
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 336272c..4c10e4f 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -112,7 +112,6 @@
         for (int i = 0; i < tupleCount; ++i) {
             int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
             FrameUtils.appendToWriter(pWriters[h], appenders[h], 
tupleAccessor, i);
-
         }
     }
 
@@ -142,4 +141,13 @@
             throw failException;
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        int consumerPartitionCount = appenders.length;
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            appenders[i].flush(pWriters[i], true);
+            pWriters[i].flush();
+        }
+    }
 }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
index 24b3bf9..35ae0da 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
@@ -108,6 +108,11 @@
         public void fail() throws HyracksDataException {
             // do nothing
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            // do nothing
+        }
     }
 
     @Override
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index 8228367..7d95a39 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -70,6 +70,10 @@
                 throw new HyracksDataException(e);
             }
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     private static final long serialVersionUID = 1L;
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index d041d5e..24e5689 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -126,4 +126,9 @@
         // TODO Auto-generated method stub
 
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // Do nothing
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 7539abf..515987a 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -73,4 +73,9 @@
     public void close() throws HyracksDataException {
         pgw.close();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        pgw.flush();
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 1c08b53..ca48fd0 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -184,4 +184,8 @@
             appenderWrapper.close();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 7badc1e..2d6ebb2 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -571,6 +571,10 @@
                 public void fail() throws HyracksDataException {
                     writer.fail();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 87ac9bd..c2cb13f 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -251,6 +251,10 @@
                 public void fail() throws HyracksDataException {
                     writer.fail();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index a12450e..76a0853 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -215,6 +215,10 @@
                 public void fail() throws HyracksDataException {
                     writer.fail();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c0c467a..f40f5d5 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -115,15 +115,15 @@
     private final int[] probeKeys;
     private final int[] buildKeys;
     private final IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem 
HJ
-    private final ITuplePairComparatorFactory tuplePairComparatorFactory0; 
//For NLJ in probe
-    private final ITuplePairComparatorFactory tuplePairComparatorFactory1; 
//For NLJ in probe
+    private final IBinaryComparatorFactory[] comparatorFactories; // For 
in-mem HJ
+    private final ITuplePairComparatorFactory tuplePairComparatorFactory0; // 
For NLJ in probe
+    private final ITuplePairComparatorFactory tuplePairComparatorFactory1; // 
For NLJ in probe
     private final IPredicateEvaluatorFactory predEvaluatorFactory;
 
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
-    //Flags added for test purpose
+    // Flags added for test purpose
     private static boolean skipInMemoryHJ = false;
     private static boolean forceNLJ = false;
     private static boolean forceRR = false;
@@ -195,7 +195,7 @@
 
     }
 
-    //memorySize is the memory for join (we have already excluded the 2 
buffers for in/out)
+    // memorySize is the memory for join (we have already excluded the 2 
buffers for in/out)
     private int getNumberOfPartitions(int memorySize, int buildSize, double 
factor, int nPartitions)
             throws HyracksDataException {
         int numberOfPartitions = 0;
@@ -203,11 +203,11 @@
             throw new HyracksDataException("not enough memory is available for 
Hybrid Hash Join");
         }
         if (memorySize > buildSize) {
-            return 1; //We will switch to in-Mem HJ eventually
+            return 1; // We will switch to in-Mem HJ eventually
         }
         numberOfPartitions = (int) (Math.ceil((buildSize * factor / 
nPartitions - memorySize) / (memorySize - 1)));
         if (numberOfPartitions <= 0) {
-            numberOfPartitions = 1; //becomes in-memory hash join
+            numberOfPartitions = 1; // becomes in-memory hash join
         }
         if (numberOfPartitions > memorySize) {
             numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor 
/ nPartitions));
@@ -284,7 +284,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    if (memsize <= 2) { //Dedicated buffers: One buffer to 
read and one buffer for output
+                    if (memsize <= 2) { // Dedicated buffers: One buffer to 
read and one buffer for output
                         throw new HyracksDataException("not enough memory for 
Hybrid Hash Join");
                     }
                     state.memForJoin = memsize - 2;
@@ -403,7 +403,7 @@
                                 .nextSetBit(pid + 1)) {
                             RunFileReader bReader = 
state.hybridHJ.getBuildRFReader(pid);
                             RunFileReader pReader = 
state.hybridHJ.getProbeRFReader(pid);
-                            if (bReader == null || pReader == null) { //either 
of sides (or both) does not have any tuple, thus no need for joining (no 
potential match)
+                            if (bReader == null || pReader == null) { // 
either of sides (or both) does not have any tuple, thus no need for joining (no 
potential match)
                                 continue;
                             }
                             int bSize = 
state.hybridHJ.getBuildPartitionSizeInTup(pid);
@@ -435,11 +435,11 @@
                             + buildPartSize + "\tProbeSize:\t" + probePartSize 
+ " - MemForJoin " + (state.memForJoin)
                             + "  - LeftOuter is " + isLeftOuter);
 
-                    //Apply in-Mem HJ if possible
+                    // Apply in-Mem HJ if possible
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
                             || (probePartSize < state.memForJoin && 
!isLeftOuter)) {
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize < 
probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || (buildPartSize < 
probePartSize))) { // Case 1.1 - InMemHJ (wout Role-Reversal)
                             LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || 
buildSize<probe) AND ApplyInMemHJ - [Level "
                                     + level + "]");
                             tabSize = wasReversed ? 
ohhj.getProbePartitionSizeInTup(pid)
@@ -448,10 +448,10 @@
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. 
Invalid table size for inMemoryHashJoin.");
                             }
-                            //Build Side is smaller
+                            // Build Side is smaller
                             applyInMemHashJoin(buildKeys, probeKeys, tabSize, 
probeRd, buildRd, probeHpc, buildHpc,
-                                    buildSideReader, probeSideReader, false, 
pid); //checked-confirmed
-                        } else { //Case 1.2 - InMemHJ with Role Reversal
+                                    buildSideReader, probeSideReader, false, 
pid); // checked-confirmed
+                        } else { // Case 1.2 - InMemHJ with Role Reversal
                             LOGGER.fine(
                                     "\t>>>Case 1.2. (NoIsLeftOuter || 
probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
                                             + level + "]");
@@ -461,23 +461,23 @@
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. 
Invalid table size for inMemoryHashJoin.");
                             }
-                            //Probe Side is smaller
+                            // Probe Side is smaller
                             applyInMemHashJoin(probeKeys, buildKeys, tabSize, 
buildRd, probeRd, buildHpc, probeHpc,
-                                    probeSideReader, buildSideReader, true, 
pid); //checked-confirmed
+                                    probeSideReader, buildSideReader, true, 
pid); // checked-confirmed
                         }
                     }
-                    //Apply (Recursive) HHJ
+                    // Apply (Recursive) HHJ
                     else {
                         LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " 
+ level + "]");
                         OptimizedHybridHashJoin rHHj;
-                        if (!forceRR && (isLeftOuter || buildPartSize < 
probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || buildPartSize < 
probePartSize)) { // Case 2.1 - Recursive HHJ (wout Role-Reversal)
                             LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH 
(isLeftOuter || build<probe) - [Level "
                                     + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, 
(int) buildPartSize, fudgeFactor,
                                     nPartitions);
                             rHHj = new OptimizedHybridHashJoin(ctx, 
state.memForJoin, n, PROBE_REL, BUILD_REL,
                                     probeKeys, buildKeys, comparators, 
probeRd, buildRd, probeHpc, buildHpc,
-                                    predEvaluator); //checked-confirmed
+                                    predEvaluator); // checked-confirmed
 
                             buildSideReader.open();
                             rHHj.initBuild();
@@ -502,7 +502,7 @@
                                     : maxAfterProbeSize;
 
                             BitSet rPStatus = rHHj.getPartitionStatus();
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD 
* beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD 
* beforeMax))) { // Case 2.1.1 - Keep applying HHJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.1.1 - KEEP APPLYING 
RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                                 + level + "]");
@@ -515,10 +515,10 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, 
rPid, afterMax, (level + 1), false); //checked-confirmed
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, 
rPid, afterMax, (level + 1), false); // checked-confirmed
                                 }
 
-                            } else { //Case 2.1.2 - Switch to NLJ
+                            } else { // Case 2.1.2 - Switch to NLJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.1.2 - SWITCHED to NLJ 
RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                                 + level + "]");
@@ -535,22 +535,22 @@
                                     int probeSideInTups = 
rHHj.getProbePartitionSizeInTup(rPid);
                                     if (isLeftOuter || buildSideInTups < 
probeSideInTups) {
                                         applyNestedLoopJoin(buildRd, probeRd, 
memsize, rprfw, rbrfw, nljComparator0,
-                                                false); //checked-modified
+                                                false); // checked-modified
                                     } else {
                                         applyNestedLoopJoin(probeRd, buildRd, 
memsize, rbrfw, rprfw, nljComparator1,
-                                                true); //checked-modified
+                                                true); // checked-modified
                                     }
                                 }
                             }
-                        } else { //Case 2.2 - Recursive HHJ (with 
Role-Reversal)
+                        } else { // Case 2.2 - Recursive HHJ (with 
Role-Reversal)
                             LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH 
RoleReversal - [Level " + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, 
(int) probePartSize, fudgeFactor,
                                     nPartitions);
 
                             rHHj = new OptimizedHybridHashJoin(ctx, 
state.memForJoin, n, BUILD_REL, PROBE_REL,
                                     buildKeys, probeKeys, comparators, 
buildRd, probeRd, buildHpc, probeHpc,
-                                    predEvaluator); //checked-confirmed
-                            rHHj.setIsReversed(true); //Added to use 
predicateEvaluator (for inMemoryHashJoin) correctly
+                                    predEvaluator); // checked-confirmed
+                            rHHj.setIsReversed(true); // Added to use 
predicateEvaluator (for inMemoryHashJoin) correctly
 
                             probeSideReader.open();
                             rHHj.initBuild();
@@ -572,7 +572,7 @@
                                     : maxAfterProbeSize;
                             BitSet rPStatus = rHHj.getPartitionStatus();
 
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD 
* beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD 
* beforeMax))) { // Case 2.2.1 - Keep applying HHJ
                                 LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING 
RecursiveHHJ WITH RoleReversal - [Level "
                                         + level + "]");
                                 for (int rPid = rPStatus.nextSetBit(0); rPid 
>= 0; rPid = rPStatus
@@ -584,9 +584,9 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, 
rPid, afterMax, (level + 1), true); //checked-confirmed
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, 
rPid, afterMax, (level + 1), true); // checked-confirmed
                                 }
-                            } else { //Case 2.2.2 - Switch to NLJ
+                            } else { // Case 2.2.2 - Switch to NLJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.2.2 - SWITCHED to NLJ 
RecursiveHHJ WITH RoleReversal - [Level "
                                                 + level + "]");
@@ -603,10 +603,10 @@
                                     long probeSideSize = rprfw.getFileSize();
                                     if (buildSideSize > probeSideSize) {
                                         applyNestedLoopJoin(buildRd, probeRd, 
memsize, rbrfw, rprfw, nljComparator0,
-                                                true); //checked-modified
+                                                true); // checked-modified
                                     } else {
                                         applyNestedLoopJoin(probeRd, buildRd, 
memsize, rprfw, rbrfw, nljComparator1,
-                                                true); //checked-modified
+                                                true); // checked-modified
                                     }
                                 }
                             }
@@ -629,7 +629,7 @@
                     bReader.open();
                     rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        ByteBuffer copyBuffer = 
ctx.allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a 
copyBuffer, because this buffer gets added to the buffers list in the 
InMemoryHashJoin
+                        ByteBuffer copyBuffer = 
ctx.allocateFrame(rPartbuff.getFrameSize()); // We need to allocate a 
copyBuffer, because this buffer gets added to the buffers list in the 
InMemoryHashJoin
                         FrameUtils.copyAndFlip(rPartbuff.getBuffer(), 
copyBuffer);
                         joiner.build(copyBuffer);
                         rPartbuff.reset();
@@ -674,6 +674,10 @@
                     outerReader.close();
                     innerReader.close();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index 048cea2..e4b5bb3 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -62,6 +62,10 @@
         public void writeData(Object[] data) throws HyracksDataException {
             mapper.map(data, writer);
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     private static final long serialVersionUID = 1L;
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
index 971f61d..817c6d2 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
@@ -40,7 +40,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext 
ctx,
             final IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override
             public void open() throws HyracksDataException {
@@ -61,6 +61,11 @@
             public void close() throws HyracksDataException {
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
index 5c09f42..3bf115f 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -46,7 +46,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext 
ctx,
             final IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             private FrameTupleAccessor fta;
@@ -73,7 +73,7 @@
                             FrameUtils.appendToWriter(writer, partialAppender, 
fta, i);
                             currentSize++;
                         }
-                        partialAppender.flush(writer,false);
+                        partialAppender.flush(writer, false);
                         finished = true;
                     } else {
                         FrameUtils.flushFrame(buffer, writer);
@@ -92,6 +92,11 @@
             public void close() throws HyracksDataException {
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 36bf919..6f48307 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -30,8 +30,6 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -61,8 +59,8 @@
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
         if (isSingleActivity) {
-            MaterializerReaderActivityNode mra = new 
MaterializerReaderActivityNode(new ActivityId(odId,
-                    MATERIALIZER_READER_ACTIVITY_ID));
+            MaterializerReaderActivityNode mra = new 
MaterializerReaderActivityNode(
+                    new ActivityId(odId, MATERIALIZER_READER_ACTIVITY_ID));
 
             builder.addActivity(this, mra);
             builder.addSourceEdge(0, mra, 0);
@@ -97,8 +95,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(), new 
TaskId(getActivityId(),
-                            partition));
+                    state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     state.open(ctx);
                 }
 
@@ -115,6 +113,10 @@
                 public void close() throws HyracksDataException {
                     state.close();
                     state.writeOut(writer, new VSizeFrame(ctx));
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
                 }
 
             };
@@ -136,8 +138,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(), new 
TaskId(getActivityId(),
-                            partition));
+                    state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     state.open(ctx);
                 }
 
@@ -172,8 +174,8 @@
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    MaterializerTaskState state = (MaterializerTaskState) 
ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), MATERIALIZER_ACTIVITY_ID), 
partition));
+                    MaterializerTaskState state = (MaterializerTaskState) 
ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), 
MATERIALIZER_ACTIVITY_ID), partition));
                     state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 792a041..ae6ed72 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -62,6 +62,10 @@
         public void setDataWriter(int index, IOpenableDataWriter<Object[]> 
writer) {
             throw new IllegalArgumentException();
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     @Override
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index feff13c..81e95df 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -139,6 +139,13 @@
                 }
 
                 @Override
+                public void flush() throws HyracksDataException {
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].flush();
+                    }
+                }
+
+                @Override
                 public void close() throws HyracksDataException {
                     HyracksDataException hde = null;
                     try {
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index 4f12db1..2cdd6da 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -88,8 +88,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new 
CollectTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new 
CollectTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     state.buffer = new ArrayList<Object[]>();
                 }
 
@@ -107,9 +107,13 @@
                 public void fail() throws HyracksDataException {
 
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, 
recordDescProvider.getInputRecordDescriptor(
-                    getActivityId(), 0));
+            return new DeserializedOperatorNodePushable(ctx, op,
+                    
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
         }
     }
 
@@ -138,8 +142,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (CollectTaskState) ctx.getStateObject(new 
TaskId(new ActivityId(getOperatorId(),
-                            COLLECT_ACTIVITY_ID), partition));
+                    state = (CollectTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), 
COLLECT_ACTIVITY_ID), partition));
                 }
 
                 @Override
@@ -161,9 +165,14 @@
                 public void fail() throws HyracksDataException {
                     writer.fail();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, 
recordDescProvider.getOutputRecordDescriptor(
-                    getActivityId(), 0));
+            return new DeserializedOperatorNodePushable(ctx, op,
+                    
recordDescProvider.getOutputRecordDescriptor(getActivityId(), 0));
         }
     }
 
@@ -171,7 +180,8 @@
 
     private final int splits;
 
-    public SplitVectorOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
splits, RecordDescriptor recordDescriptor) {
+    public SplitVectorOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
splits,
+            RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.splits = splits;
         recordDescriptors[0] = recordDescriptor;
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index ea46682..b07549b 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -29,9 +29,9 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
-import 
org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotSmallestFit;
 import 
org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotBiggestFirst;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import 
org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotSmallestFit;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameFreeSlotPolicy;
 import 
org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
@@ -53,7 +53,7 @@
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy, int framesLimit)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
                 Integer.MAX_VALUE);
     }
@@ -61,7 +61,7 @@
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy, int framesLimit, int outputLimit)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         this.ctx = ctx;
         maxSortFrames = framesLimit - 1;
 
@@ -98,12 +98,14 @@
         }
     }
 
+    @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortRunGenerator.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                
.createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIOManager());
     }
 
+    @Override
     protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) 
throws HyracksDataException {
         return writer;
     }
@@ -113,4 +115,8 @@
         return frameSorter;
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+    }
+
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index bf4f621..dc15603 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -65,8 +65,7 @@
     public void open() throws HyracksDataException {
         IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * 
ctx.getInitialFrameSize());
         ITupleBufferManager bufferManager = new 
VariableTupleMemoryManager(framePool, recordDescriptor);
-        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, 
sortFields, nmkFactory,
-                comparatorFactories);
+        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, 
sortFields, nmkFactory, comparatorFactories);
         super.open();
     }
 
@@ -77,8 +76,8 @@
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                HeapSortRunGenerator.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                
.createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIOManager());
     }
 
@@ -100,4 +99,8 @@
         }
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+    }
+
 }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 289b879..e76d2d3 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -121,6 +121,13 @@
                         }
                     }
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.flush();
+                    }
+                }
             };
         }
     }
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index b656b46..92d18f0 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -76,4 +76,9 @@
     public String getDisplayName() {
         return "Deserialized(" + delegate + ")";
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        delegate.flush();
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
index 41e63a0..995ae30 100644
--- 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
+++ 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -26,8 +26,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -45,6 +43,7 @@
 import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Test;
 
 public class SerializationDeserializationTest {
     private static final Logger LOGGER = 
Logger.getLogger(SerializationDeserializationTest.class.getName());
@@ -82,6 +81,10 @@
                 @Override
                 public void fail() throws HyracksDataException {
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             });
         }
 
diff --git 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
index fb816da..6b08c3e 100644
--- 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
+++ 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -187,6 +187,10 @@
                         public void close() throws HyracksDataException {
 
                         }
+
+                        @Override
+                        public void flush() throws HyracksDataException {
+                        }
                     };
                 }
 
diff --git 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index 5d300f4..bba18b3 100644
--- 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -173,6 +173,10 @@
                         public void close() throws HyracksDataException {
 
                         }
+
+                        @Override
+                        public void flush() throws HyracksDataException {
+                        }
                     };
                 }
 
diff --git 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
index 43e7649..f621bf9 100644
--- 
a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
+++ 
b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -34,8 +34,6 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.FixedSizeFrame;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -45,8 +43,9 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+import org.junit.Test;
 
 public class TopKRunGeneratorTest {
 
@@ -90,14 +89,18 @@
         public void close() throws HyracksDataException {
             assertTrue(answer.isEmpty());
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     @Test
     public void testReverseOrderedDataShouldNotGenerateAnyRuns() throws 
HyracksDataException {
         int topK = 1;
         IHyracksTaskContext ctx = 
AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
         testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
     }
@@ -106,8 +109,8 @@
     public void testAlreadySortedDataShouldNotGenerateAnyRuns() throws 
HyracksDataException {
         int topK = SORT_FRAME_LIMIT;
         IHyracksTaskContext ctx = 
AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
         testInMemoryOnly(ctx, topK, ORDER.INORDER, sorter);
     }
@@ -116,8 +119,8 @@
     public void testHybridTopKShouldNotGenerateAnyRuns() throws 
HyracksDataException {
         int topK = 1;
         IHyracksTaskContext ctx = 
AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
         testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
     }
@@ -126,8 +129,8 @@
     public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
         int topK = 1;
         IHyracksTaskContext ctx = 
AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, 
SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
     }
 
@@ -148,8 +151,8 @@
         int minRecordSize = 16;
         int maxRecordSize = 64;
 
-        AbstractRunGeneratorTest
-                .prepareData(ctx, frameList, minDataSize, minRecordSize, 
maxRecordSize, null, keyValuePair);
+        AbstractRunGeneratorTest.prepareData(ctx, frameList, minDataSize, 
minRecordSize, maxRecordSize, null,
+                keyValuePair);
 
         assert topK > 0;
 
diff --git 
a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
 
b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index a97441c..2226b03 100644
--- 
a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ 
b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -40,7 +40,8 @@
 
     public BTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor 
opDesc, IHyracksTaskContext ctx,
             int partition, IRecordDescriptorProvider recordDescProvider, int[] 
lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive, int[] 
minFilterFieldIndexes, int[] maxFilterFieldIndexes) {
+            boolean lowKeyInclusive, boolean highKeyInclusive, int[] 
minFilterFieldIndexes,
+            int[] maxFilterFieldIndexes) {
         super(opDesc, ctx, partition, recordDescProvider, 
minFilterFieldIndexes, maxFilterFieldIndexes);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
@@ -75,8 +76,8 @@
         ITreeIndex treeIndex = (ITreeIndex) index;
         lowKeySearchCmp = 
BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
         highKeySearchCmp = 
BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), 
highKey);
-        return new RangePredicate(lowKey, highKey, lowKeyInclusive, 
highKeyInclusive, lowKeySearchCmp,
-                highKeySearchCmp, minFilterKey, maxFilterKey);
+        return new RangePredicate(lowKey, highKey, lowKeyInclusive, 
highKeyInclusive, lowKeySearchCmp, highKeySearchCmp,
+                minFilterKey, maxFilterKey);
     }
 
     @Override
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 631b2f1..59d852e 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -91,7 +91,6 @@
             }
         }
         FrameUtils.flushFrame(buffer, writer);
-
     }
 
     @Override
@@ -113,6 +112,11 @@
     }
 
     @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
+
+    @Override
     public void fail() throws HyracksDataException {
         if (index != null) {
             writer.fail();
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 6c7e8d0..6736420 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -162,4 +162,9 @@
             writer.fail();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
 }
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 568bde8..41a8c62 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -188,6 +188,14 @@
     }
 
     @Override
+    public void flush() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            appender.flush(writer, true);
+        }
+        writer.flush();
+    }
+
+    @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
         if (index != null) {
diff --git 
a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
 
b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index c91aff7..64b2e7f 100644
--- 
a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ 
b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -164,4 +164,10 @@
     public void fail() throws HyracksDataException {
         writer.fail();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer, true);
+        writer.flush();
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/584
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I85424bab7965b71aac709280af066e1655457aa3
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to