abdullah alamoudi has submitted this change and it was merged.

Change subject: [NO ISSUE][RT] Follow IFrameWriter protocol in 
AbstractOneInputPushRuntime
......................................................................


[NO ISSUE][RT] Follow IFrameWriter protocol in AbstractOneInputPushRuntime

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Many implementations of AbstractOneInputPushRuntime didn't
  follow the IFrameWriter protocol causing many unexpected
  runtime exceptions.
- This change ensures that all of the subclasses implement the
  protocol correctly.

Change-Id: I5133007f298366f58b53acc9f48bc553724dd7b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2884
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
---
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
16 files changed, 63 insertions(+), 129 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Jenkins: Verified; ; Verified
  Michael Blow: Looks good to me, approved

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 74ba139..2692cc7 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -89,7 +89,7 @@
                 return;
             }
             initAccessAppend(ctx);
-            writer.open();
+            super.open();
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);
         }
@@ -139,31 +139,6 @@
     protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] 
primaryKeyFields) {
         MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, 
longHashes);
         return Math.abs((int) longHashes[0]);
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        failed = true;
-        if (isSink) {
-            return;
-        }
-        writer.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (isSink) {
-            return;
-        }
-        try {
-            flushIfNotFailed();
-        } catch (Exception e) {
-            writer.fail();
-            throw e;
-        } finally {
-            writer.close();
-        }
-        appender.reset(frame, true);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index e99b61b..1f9cb91 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -67,7 +67,6 @@
             private ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(aggregs.length);
 
             private boolean first = true;
-            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -81,8 +80,7 @@
                 for (int i = 0; i < aggregFactories.length; i++) {
                     aggregs[i].init();
                 }
-                isOpen = true;
-                writer.open();
+                super.open();
             }
 
             @Override
@@ -119,14 +117,6 @@
             private void processTuple(FrameTupleReference tupleRef) throws 
HyracksDataException {
                 for (int f = 0; f < aggregs.length; f++) {
                     aggregs[f].step(tupleRef);
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                failed = true;
-                if (isOpen) {
-                    writer.fail();
                 }
             }
         };
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index a7468a7..71b44d3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -51,25 +52,20 @@
 
     @Override
     public void close() throws HyracksDataException {
-        HyracksDataException closeException = null;
+        if (!isOpen) {
+            return;
+        }
+        Throwable closeException = null;
         try {
             flushIfNotFailed();
         } catch (Exception e) {
-            closeException = HyracksDataException.create(e);
-            writer.fail();
+            closeException = e;
+            fail(closeException);
         } finally {
-            try {
-                writer.close();
-            } catch (Exception e) {
-                if (closeException == null) {
-                    closeException = HyracksDataException.create(e);
-                } else {
-                    closeException.addSuppressed(e);
-                }
-            }
+            closeException = CleanupUtils.close(writer, closeException);
         }
         if (closeException != null) {
-            throw closeException;
+            throw HyracksDataException.create(closeException);
         }
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 5cced8d..c7d2d94 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -27,6 +27,7 @@
     protected IFrameWriter writer;
     protected RecordDescriptor outputRecordDesc;
     protected boolean failed;
+    protected boolean isOpen;
 
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
@@ -35,8 +36,24 @@
     }
 
     @Override
+    public void open() throws HyracksDataException {
+        isOpen = true;
+        writer.open();
+    }
+
+    @Override
     public void fail() throws HyracksDataException {
         failed = true;
-        writer.fail();
+        if (isOpen) {
+            writer.fail();
+        }
+    }
+
+    protected void fail(Throwable failure) {
+        try {
+            fail();
+        } catch (Throwable th) {
+            failure.addSuppressed(th);
+        }
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index 35563e0..cccfd62 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -32,16 +32,6 @@
     }
 
     @Override
-    public void close() throws HyracksDataException {
-        // close is a no op since this operator completes operating in open()
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        // fail is a no op since if a failure happened, the operator would've 
already called fail() on downstream
-    }
-
-    @Override
     public void flush() throws HyracksDataException {
         // flush will never be called on this runtime
         throw new UnsupportedOperationException();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 159fde7..3cee12d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -151,7 +151,8 @@
 
         @Override
         public void open() throws HyracksDataException {
-            writer.open();
+            // writer opened many times?
+            super.open();
             if (first) {
                 first = false;
                 initAccessAppendRef(ctx);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index f251bb7..2453029 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import 
org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
@@ -69,7 +70,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (frameSorter == null) {
                     IFrameBufferManager manager = new 
VariableFrameMemoryManager(
                             new VariableFramePool(ctx, 
VariableFramePool.UNLIMITED_MEMORY),
@@ -87,11 +88,22 @@
 
             @Override
             public void close() throws HyracksDataException {
-                try {
-                    frameSorter.sort();
-                    frameSorter.flush(writer);
-                } finally {
-                    writer.close();
+                Throwable failure = null;
+                if (isOpen) {
+                    try {
+                        if (!failed) {
+                            frameSorter.sort();
+                            frameSorter.flush(writer);
+                        }
+                    } catch (Throwable th) {
+                        failure = th;
+                        fail(th);
+                    } finally {
+                        failure = CleanupUtils.close(writer, failure);
+                    }
+                }
+                if (failure != null) {
+                    throw HyracksDataException.create(failure);
                 }
             }
         };
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index b1b652f..5b36c5f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,7 +96,6 @@
             private IScalarEvaluator[] eval = new 
IScalarEvaluator[evalFactories.length];
             private ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
-            private boolean isOpen = false;
             private int tupleIndex = 0;
 
             @Override
@@ -109,15 +108,7 @@
                         eval[i] = evalFactories[i].createScalarEvaluator(ctx);
                     }
                 }
-                isOpen = true;
-                writer.open();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                if (isOpen) {
-                    super.close();
-                }
+                super.open();
             }
 
             @Override
@@ -173,13 +164,6 @@
                     }
                 } catch (HyracksDataException e) {
                     throw 
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, 
tupleIndex);
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                if (isOpen) {
-                    super.fail();
                 }
             }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 7bd924d..9ca3cd6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -48,7 +48,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (!appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize())) {
                     throw new IllegalStateException();
                 }
@@ -56,13 +56,10 @@
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                writer.fail();
-            }
-
-            @Override
             public void close() throws HyracksDataException {
-                writer.close();
+                if (isOpen) {
+                    writer.close();
+                }
             }
 
             @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index f94672d..832cb22 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -49,11 +49,6 @@
             initAccessAppend(ctx);
         }
 
-        @Override
-        public void open() throws HyracksDataException {
-            writer.open();
-        }
-
         public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws 
HyracksDataException {
             tAccess.reset(inputBuffer);
             appendTupleToFrame(tIndex);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 33b7725..ca58d4d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -89,7 +89,6 @@
             private final IRunningAggregateEvaluator[] raggs = new 
IRunningAggregateEvaluator[runningAggregates.length];
             private final ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
-            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -104,22 +103,7 @@
                 for (int i = 0; i < runningAggregates.length; i++) {
                     raggs[i].init();
                 }
-                isOpen = true;
-                writer.open();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                if (isOpen) {
-                    super.close();
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                if (isOpen) {
-                    writer.fail();
-                }
+                super.open();
             }
 
             @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 59df402..aca5bf1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -72,7 +72,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (evalMaxObjects == null) {
                     initAccessAppendRef(ctx);
                     evalMaxObjects = 
maxObjectsEvalFactory.createScalarEvaluator(ctx);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index a8ca082..713a99c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -53,7 +53,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (first) {
                     first = false;
                     initAccessAppend(ctx);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 171544d..933e640 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -92,8 +92,7 @@
                     initAccessAppendFieldRef(ctx);
                     eval = cond.createScalarEvaluator(ctx);
                 }
-                writer.open();
-
+                super.open();
                 //prepare nullTupleBuilder
                 if (retainMissing && missingWriter == null) {
                     missingWriter = missingWriterFactory.createMissingWriter();
@@ -101,15 +100,6 @@
                     DataOutput out = missingTupleBuilder.getDataOutput();
                     missingWriter.writeMissing(out);
                     missingTupleBuilder.addFieldEndOffset();
-                }
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                try {
-                    flushIfNotFailed();
-                } finally {
-                    writer.close();
                 }
             }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 53974b2..7e5c346 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -43,6 +43,9 @@
     private char fieldDelimiter;
     private ITupleParserFactory parserFactory;
 
+    /*
+     * NOTE: This operator doesn't follow the IFrameWriter protocol
+     */
     public StringStreamingRuntimeFactory(String command, IPrinterFactory[] 
printerFactories, char fieldDelimiter,
             ITupleParserFactory parserFactory) {
         super(null);
@@ -129,7 +132,6 @@
                     first = false;
                     initAccessAppendRef(ctx);
                 }
-
                 try {
                     ITupleParser parser = parserFactory.createTupleParser(ctx);
                     process = Runtime.getRuntime().exec(command);
@@ -141,6 +143,7 @@
                             new 
DumpInStreamToPrintStream(process.getErrorStream(), System.err);
                     dumpStderr = new Thread(disps);
                     dumpStderr.start();
+                    super.open();
                 } catch (IOException e) {
                     throw HyracksDataException.create(e);
                 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 914f4a0..22189ac 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -94,7 +94,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (tRef == null) {
                     initAccessAppendRef(ctx);
                 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2884
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I5133007f298366f58b53acc9f48bc553724dd7b5
Gerrit-PatchSet: 6
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to