abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2463

Change subject: [NO ISSUE][ING] Follow the IFrameWriter contract in Feed 
Pipeline
......................................................................

[NO ISSUE][ING] Follow the IFrameWriter contract in Feed Pipeline

Change-Id: Ife679fb9643dc6b39d035e0eecdb915b227503a5
---
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
22 files changed, 121 insertions(+), 672 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/63/2463/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
deleted file mode 100644
index 22d0d6b..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface ITupleForwarder {
-
-    public static final String FORWARD_POLICY = "forward-policy";
-
-    public enum TupleForwardPolicy {
-        FRAME_FULL,
-        COUNTER_TIMER_EXPIRED,
-        RATE_CONTROLLED,
-        FEED
-    }
-
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) 
throws HyracksDataException;
-
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
-
-    public void close() throws HyracksDataException;
-
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index bbd93c2..a324496 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -19,16 +19,13 @@
 package org.apache.asterix.external.dataflow;
 
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public abstract class AbstractDataFlowController implements 
IDataFlowController {
 
-    protected final ITupleForwarder tupleForwarder;
     protected final IHyracksTaskContext ctx;
 
-    public AbstractDataFlowController(IHyracksTaskContext ctx, ITupleForwarder 
tupleForwarder) {
+    public AbstractDataFlowController(IHyracksTaskContext ctx) {
         this.ctx = ctx;
-        this.tupleForwarder = tupleForwarder;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 3437de1..54ecaee 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -25,32 +25,18 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public abstract class AbstractFeedDataFlowController implements 
IDataFlowController {
-    protected final FeedTupleForwarder tupleForwarder;
+    protected TupleForwarder tupleForwarder;
     protected final IHyracksTaskContext ctx;
     protected final int numOfFields;
     protected final ArrayTupleBuilder tb;
     protected final FeedLogManager feedLogManager;
     protected boolean flushing;
 
-    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfFields) {
+    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager, int numOfFields) {
         this.feedLogManager = feedLogManager;
         this.numOfFields = numOfFields;
         this.ctx = ctx;
-        this.tupleForwarder = tupleForwarder;
         this.tb = new ArrayTupleBuilder(numOfFields);
-    }
-
-    @Override
-    public boolean pause() {
-        tupleForwarder.pause();
-        return true;
-    }
-
-    @Override
-    public boolean resume() {
-        tupleForwarder.resume();
-        return true;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 2db17e2..b14722b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -30,11 +30,10 @@
 
     private final IRecordWithPKDataParser<T> dataParser;
 
-    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final 
FeedTupleForwarder tupleForwarder,
-            final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final 
IRecordReader<T> recordReader)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
+    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final 
FeedLogManager feedLogManager,
+            final int numOfOutputFields, final IRecordWithPKDataParser<T> 
dataParser,
+            final IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields, dataParser, 
recordReader);
         this.dataParser = dataParser;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4447b28..621397b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -28,11 +28,10 @@
 
 public class ChangeFeedWithMetaDataFlowController<T> extends 
FeedWithMetaDataFlowController<T> {
 
-    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, 
final FeedTupleForwarder tupleForwarder,
-            final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final 
IRecordReader<T> recordReader)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
+    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, 
final FeedLogManager feedLogManager,
+            final int numOfOutputFields, final IRecordWithMetadataParser<T> 
dataParser,
+            final IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields, dataParser, 
recordReader);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
deleted file mode 100644
index 10815d9..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CounterTimerTupleForwarder implements ITupleForwarder {
-
-    public static final String BATCH_SIZE = "batch-size";
-    public static final String BATCH_INTERVAL = "batch-interval";
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private int batchSize;
-    private long batchInterval;
-    private int tuplesInFrame = 0;
-    private TimeBasedFlushTask flushTask;
-    private Timer timer;
-    private Object lock = new Object();
-    private boolean activeTimer = false;
-
-    private CounterTimerTupleForwarder(int batchSize, long batchInterval) {
-        this.batchSize = batchSize;
-        this.batchInterval = batchInterval;
-        if (batchInterval > 0L) {
-            activeTimer = true;
-        }
-    }
-
-    // Factory method
-    public static CounterTimerTupleForwarder create(Map<String, String> 
configuration) {
-        int batchSize = -1;
-        long batchInterval = 0L;
-        String propValue = configuration.get(BATCH_SIZE);
-        if (propValue != null) {
-            batchSize = Integer.parseInt(propValue);
-        }
-        propValue = configuration.get(BATCH_INTERVAL);
-        if (propValue != null) {
-            batchInterval = Long.parseLong(propValue);
-        }
-        return new CounterTimerTupleForwarder(batchSize, batchInterval);
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        appender.reset(frame, true);
-        this.writer = writer;
-        if (activeTimer) {
-            this.timer = new Timer();
-            this.flushTask = new TimeBasedFlushTask(writer, lock);
-            timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
-        }
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (activeTimer) {
-            synchronized (lock) {
-                addTupleToFrame(tb);
-            }
-        } else {
-            addTupleToFrame(tb);
-        }
-        tuplesInFrame++;
-    }
-
-    private void addTupleToFrame(ArrayTupleBuilder tb) throws 
HyracksDataException {
-        if (tuplesInFrame == batchSize
-                || !appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize())) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("flushing frame containg (" + tuplesInFrame + ") 
tuples");
-            }
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            tuplesInFrame = 0;
-            appender.reset(frame, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 
0, tb.getSize())) {
-                throw new 
RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            if (activeTimer) {
-                synchronized (lock) {
-                    FrameUtils.flushFrame(frame.getBuffer(), writer);
-                }
-            } else {
-                FrameUtils.flushFrame(frame.getBuffer(), writer);
-            }
-        }
-
-        if (timer != null) {
-            timer.cancel();
-        }
-    }
-
-    private class TimeBasedFlushTask extends TimerTask {
-
-        private IFrameWriter writer;
-        private final Object lock;
-
-        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
-            this.writer = writer;
-            this.lock = lock;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (tuplesInFrame > 0) {
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("TTL expired flushing frame (" + 
tuplesInFrame + ")");
-                    }
-                    synchronized (lock) {
-                        FrameUtils.flushFrame(frame.getBuffer(), writer);
-                        appender.reset(frame, true);
-                        tuplesInFrame = 0;
-                    }
-                }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 9826be7..22e0c1a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -26,14 +26,12 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -58,10 +56,9 @@
     protected long incomingRecordsCount = 0;
     protected long failedRecordsCount = 0;
 
-    public FeedRecordDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfOutputFields, 
IRecordDataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
+    public FeedRecordDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager, int numOfOutputFields,
+            IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) 
throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         recordReader.setFeedLogManager(feedLogManager);
@@ -79,7 +76,7 @@
         }
         Throwable failure = null;
         try {
-            tupleForwarder.initialize(ctx, writer);
+            this.tupleForwarder = new TupleForwarder(ctx, writer);
             while (hasNext()) {
                 IRawRecord<? extends T> record = next();
                 if (record == null) {
@@ -102,17 +99,14 @@
                 try {
                     flush();
                 } catch (Exception flushException) {
-                    tupleForwarder.fail();
                     flushException.addSuppressed(e);
                     failure = flushException;
                 }
             } else {
                 failure = e;
-                tupleForwarder.fail();
             }
         } catch (Throwable e) {
             failure = e;
-            tupleForwarder.fail();
             LOGGER.log(Level.WARN, "Failure while operating a feed source", e);
         } finally {
             failure = finish(failure);
@@ -168,11 +162,17 @@
     }
 
     private Throwable finish(Throwable failure) {
-        Throwable th = CleanupUtils.close(recordReader, null);
-        th = DataflowUtils.close(tupleForwarder, th);
+        Throwable th = CleanupUtils.close(recordReader, failure);
+        if (th != null) {
+            try {
+                tupleForwarder.complete();
+            } catch (Throwable completeFailure) {
+                th = completeFailure;
+            }
+        }
         closeSignal();
         setState(State.STOPPED);
-        return ExceptionUtils.suppress(failure, th);
+        return th;
     }
 
     private boolean parseAndForward(IRawRecord<? extends T> record) throws 
IOException {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 025520e..ffa42e5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,9 @@
     private final AsterixInputStream stream;
     protected long incomingRecordsCount = 0;
 
-    public FeedStreamDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, IStreamDataParser streamParser, 
AsterixInputStream inputStream) {
-        super(ctx, tupleForwarder, feedLogManager, 1);
+    public FeedStreamDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager,
+            IStreamDataParser streamParser, AsterixInputStream inputStream) {
+        super(ctx, feedLogManager, 1);
         this.dataParser = streamParser;
         this.stream = inputStream;
     }
@@ -41,7 +41,7 @@
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
-            tupleForwarder.initialize(ctx, writer);
+            tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
                 if (!parseNext()) {
                     break;
@@ -50,10 +50,9 @@
                 tupleForwarder.addTuple(tb);
                 incomingRecordsCount++;
             }
-        } catch (Exception e) {
+            tupleForwarder.complete();
+        } catch (Throwable e) {
             throw HyracksDataException.create(e);
-        } finally {
-            tupleForwarder.close();
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
deleted file mode 100644
index f824b67..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FeedTupleForwarder implements ITupleForwarder {
-
-    private final FeedLogManager feedLogManager;
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private boolean paused = false;
-    private boolean initialized;
-    private boolean failed;
-
-    public FeedTupleForwarder(FeedLogManager feedLogManager) {
-        this.feedLogManager = feedLogManager;
-    }
-
-    public FeedLogManager getFeedLogManager() {
-        return feedLogManager;
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
-        if (!initialized) {
-            this.frame = new VSizeFrame(ctx);
-            this.writer = writer;
-            this.appender = new FrameTupleAppender(frame);
-            initialized = true;
-        }
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (paused) {
-            synchronized (this) {
-                while (paused) {
-                    try {
-                        wait();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw HyracksDataException.create(e);
-                    }
-                }
-            }
-        }
-        DataflowUtils.addTupleToFrame(appender, tb, writer);
-    }
-
-    public void pause() {
-        paused = true;
-    }
-
-    public synchronized void resume() {
-        paused = false;
-        notifyAll();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        Throwable throwable = null;
-        try {
-            if (!failed && appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(frame.getBuffer(), writer);
-            }
-        } catch (Throwable th) {
-            throwable = th;
-            throw th;
-        } finally {
-            try {
-                feedLogManager.close();
-            } catch (IOException e) {
-                if (throwable != null) {
-                    throwable.addSuppressed(e);
-                } else {
-                    throw HyracksDataException.create(e);
-                }
-            } catch (Throwable th) {
-                if (throwable != null) {
-                    throwable.addSuppressed(th);
-                } else {
-                    throw th;
-                }
-            }
-        }
-    }
-
-    public void flush() throws HyracksDataException {
-        appender.flush(writer);
-    }
-
-    public void fail() throws HyracksDataException {
-        failed = true;
-        writer.fail();
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index c7f6d9c..289c16f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -30,10 +30,9 @@
 
     protected final IRecordWithMetadataParser<T> dataParser;
 
-    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfOutputFields, 
IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
+    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager, int numOfOutputFields,
+            IRecordWithMetadataParser<T> dataParser, IRecordReader<T> 
recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields, dataParser, 
recordReader);
         this.dataParser = dataParser;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
deleted file mode 100644
index 18927cd..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FrameFullTupleForwarder implements ITupleForwarder {
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        this.writer = writer;
-        appender.reset(frame, true);
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        boolean success = appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
-        if (!success) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            appender.reset(frame, true);
-            success = appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
-            if (!success) {
-                throw new 
RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
-
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index c4f75e3..b956295 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -31,10 +30,9 @@
 public class IndexingDataFlowController<T> extends RecordDataFlowController<T> 
{
     private final IExternalIndexer indexer;
 
-    public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder 
tupleForwarder,
-            IRecordDataParser<T> dataParser, IRecordReader<? extends T> 
recordReader, IExternalIndexer indexer)
-            throws IOException {
-        super(ctx, tupleForwarder, dataParser, recordReader, 1 + 
indexer.getNumberOfFields());
+    public IndexingDataFlowController(IHyracksTaskContext ctx, 
IRecordDataParser<T> dataParser,
+            IRecordReader<? extends T> recordReader, IExternalIndexer indexer) 
throws IOException {
+        super(ctx, dataParser, recordReader, 1 + indexer.getNumberOfFields());
         this.indexer = indexer;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
deleted file mode 100644
index f34b77d..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateControlledTupleForwarder implements ITupleForwarder {
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
-    private RateControlledTupleForwarder(long interTupleInterval) {
-        this.interTupleInterval = interTupleInterval;
-        delayConfigured = interTupleInterval != 0L;
-    }
-
-    // Factory method
-    public static RateControlledTupleForwarder create(Map<String, String> 
configuration) {
-        long interTupleInterval = 0L;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        }
-        return new RateControlledTupleForwarder(interTupleInterval);
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        this.writer = writer;
-        appender.reset(frame, true);
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (delayConfigured) {
-            try {
-                Thread.sleep(interTupleInterval);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw HyracksDataException.create(e);
-            }
-        }
-        boolean success = appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
-        if (!success) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            appender.reset(frame, true);
-            success = appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());
-            if (!success) {
-                throw new 
RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 0f9572d..aebdefb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -21,7 +21,6 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -33,9 +32,9 @@
     protected final IRecordReader<? extends T> recordReader;
     protected final int numOfTupleFields;
 
-    public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder 
tupleForwarder,
-            IRecordDataParser<T> dataParser, IRecordReader<? extends T> 
recordReader, int numOfTupleFields) {
-        super(ctx, tupleForwarder);
+    public RecordDataFlowController(IHyracksTaskContext ctx, 
IRecordDataParser<T> dataParser,
+            IRecordReader<? extends T> recordReader, int numOfTupleFields) {
+        super(ctx);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         this.numOfTupleFields = numOfTupleFields;
@@ -45,7 +44,7 @@
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
-            tupleForwarder.initialize(ctx, writer);
+            TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 tb.reset();
@@ -54,7 +53,7 @@
                 appendOtherTupleFields(tb);
                 tupleForwarder.addTuple(tb);
             }
-            tupleForwarder.close();
+            tupleForwarder.complete();
             recordReader.close();
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index ccf22da..a28c484 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.dataflow;
 
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -28,9 +27,8 @@
 public class StreamDataFlowController extends AbstractDataFlowController {
     private final IStreamDataParser dataParser;
 
-    public StreamDataFlowController(IHyracksTaskContext ctx, ITupleForwarder 
tupleForwarder,
-            IStreamDataParser dataParser) {
-        super(ctx, tupleForwarder);
+    public StreamDataFlowController(IHyracksTaskContext ctx, IStreamDataParser 
dataParser) {
+        super(ctx);
         this.dataParser = dataParser;
     }
 
@@ -38,7 +36,7 @@
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-            tupleForwarder.initialize(ctx, writer);
+            TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
                 tb.reset();
                 if (!dataParser.parse(tb.getDataOutput())) {
@@ -47,7 +45,7 @@
                 tb.addFieldEndOffset();
                 tupleForwarder.addTuple(tb);
             }
-            tupleForwarder.close();
+            tupleForwarder.complete();
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
new file mode 100644
index 0000000..afb00b0
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class TupleForwarder {
+
+    private final FrameTupleAppender appender;
+    private final IFrame frame;
+    private final IFrameWriter writer;
+
+    public TupleForwarder(IHyracksTaskContext ctx, IFrameWriter writer) throws 
HyracksDataException {
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        this.appender = new FrameTupleAppender(frame);
+    }
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        DataflowUtils.addTupleToFrame(appender, tb, writer);
+    }
+
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
+
+    public void complete() throws HyracksDataException {
+        appender.write(writer, false);
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 0503677..a6c2ddd 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
@@ -79,7 +80,8 @@
             
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
             run();
-        } catch (Exception e) {
+        } catch (Throwable e) {
+            CleanupUtils.fail(writer, e);
             LOGGER.log(Level.WARN, "Failure during data ingestion", e);
             throw e;
         } finally {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 78f24a5..3e53b52 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -42,12 +42,10 @@
 import 
org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
 import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
-import org.apache.asterix.external.dataflow.FeedTupleForwarder;
 import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.om.types.ARecordType;
@@ -70,35 +68,29 @@
                     IRecordDataParserFactory<?> recordParserFactory = 
(IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = 
recordParserFactory.createRecordParser(ctx);
                     if (indexingOp) {
-                        return new IndexingDataFlowController(ctx,
-                                DataflowUtils.getTupleForwarder(configuration, 
feedLogManager), dataParser,
-                                recordReader, ((IIndexingDatasource) 
recordReader).getIndexer());
+                        return new IndexingDataFlowController(ctx, dataParser, 
recordReader,
+                                ((IIndexingDatasource) 
recordReader).getIndexer());
                     } else if (isFeed) {
-                        FeedTupleForwarder tupleForwarder =
-                                (FeedTupleForwarder) 
DataflowUtils.getTupleForwarder(configuration, feedLogManager);
                         boolean isChangeFeed = 
ExternalDataUtils.isChangeFeed(configuration);
                         boolean isRecordWithMeta = 
ExternalDataUtils.isRecordWithMeta(configuration);
                         if (isRecordWithMeta) {
                             if (isChangeFeed) {
                                 int numOfKeys = 
ExternalDataUtils.getNumberOfKeys(configuration);
-                                return new 
ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, 
(IRecordWithMetadataParser) dataParser, recordReader);
+                                return new 
ChangeFeedWithMetaDataFlowController(ctx, feedLogManager, numOfKeys + 2,
+                                        (IRecordWithMetadataParser) 
dataParser, recordReader);
                             } else {
-                                return new FeedWithMetaDataFlowController(ctx, 
tupleForwarder, feedLogManager, 2,
+                                return new FeedWithMetaDataFlowController(ctx, 
feedLogManager, 2,
                                         (IRecordWithMetadataParser) 
dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = 
ExternalDataUtils.getNumberOfKeys(configuration);
-                            return new ChangeFeedDataFlowController(ctx, 
tupleForwarder, feedLogManager, numOfKeys + 1,
+                            return new ChangeFeedDataFlowController(ctx, 
feedLogManager, numOfKeys + 1,
                                     (IRecordWithPKDataParser) dataParser, 
recordReader);
                         } else {
-                            return new FeedRecordDataFlowController(ctx, 
tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader);
+                            return new FeedRecordDataFlowController(ctx, 
feedLogManager, 1, dataParser, recordReader);
                         }
                     } else {
-                        return new RecordDataFlowController(ctx,
-                                DataflowUtils.getTupleForwarder(configuration, 
feedLogManager), dataParser,
-                                recordReader, 1);
+                        return new RecordDataFlowController(ctx, dataParser, 
recordReader, 1);
                     }
                 case STREAM:
                     IInputStreamFactory streamFactory = (IInputStreamFactory) 
dataSourceFactory;
@@ -107,12 +99,9 @@
                     IStreamDataParser streamParser = 
streamParserFactory.createInputStreamParser(ctx, partition);
                     streamParser.setInputStream(stream);
                     if (isFeed) {
-                        return new FeedStreamDataFlowController(ctx,
-                                (FeedTupleForwarder) 
DataflowUtils.getTupleForwarder(configuration, feedLogManager),
-                                feedLogManager, streamParser, stream);
+                        return new FeedStreamDataFlowController(ctx, 
feedLogManager, streamParser, stream);
                     } else {
-                        return new StreamDataFlowController(ctx, 
DataflowUtils.getTupleForwarder(configuration, null),
-                                streamParser);
+                        return new StreamDataFlowController(ctx, streamParser);
                     }
                 default:
                     throw new 
RuntimeDataException(ErrorCode.PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE,
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 438f1df..37d400f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -18,28 +18,15 @@
  */
 package org.apache.asterix.external.util;
 
-import java.util.Map;
-
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.external.api.ITupleForwarder.TupleForwardPolicy;
-import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder;
-import org.apache.asterix.external.dataflow.FeedTupleForwarder;
-import org.apache.asterix.external.dataflow.FrameFullTupleForwarder;
-import org.apache.asterix.external.dataflow.RateControlledTupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class DataflowUtils {
-    private static final Logger LOGGER = LogManager.getLogger();
 
     private DataflowUtils() {
     }
@@ -54,32 +41,6 @@
         }
     }
 
-    public static ITupleForwarder getTupleForwarder(Map<String, String> 
configuration, FeedLogManager feedLogManager)
-            throws HyracksDataException {
-        ITupleForwarder.TupleForwardPolicy policyType = null;
-        String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
-        if (ExternalDataUtils.isFeed(configuration)) {
-            // TODO pass this value in the configuration and avoid this check 
for feeds
-            policyType = TupleForwardPolicy.FEED;
-        } else if (propValue == null) {
-            policyType = TupleForwardPolicy.FRAME_FULL;
-        } else {
-            policyType = 
TupleForwardPolicy.valueOf(propValue.trim().toUpperCase());
-        }
-        switch (policyType) {
-            case FEED:
-                return new FeedTupleForwarder(feedLogManager);
-            case FRAME_FULL:
-                return new FrameFullTupleForwarder();
-            case COUNTER_TIMER_EXPIRED:
-                return CounterTimerTupleForwarder.create(configuration);
-            case RATE_CONTROLLED:
-                return RateControlledTupleForwarder.create(configuration);
-            default:
-                throw new 
RuntimeDataException(ErrorCode.UTIL_DATAFLOW_UTILS_UNKNOWN_FORWARD_POLICY);
-        }
-    }
-
     public static void addTupleToFrame(FrameTupleAppender appender, 
ITupleReference tuple, IFrameWriter writer)
             throws HyracksDataException {
         if (!appender.append(tuple)) {
@@ -88,31 +49,5 @@
                 throw new 
RuntimeDataException(ErrorCode.UTIL_DATAFLOW_UTILS_TUPLE_TOO_LARGE);
             }
         }
-    }
-
-    /**
-     * Close the ITupleForwarder and suppress any Throwable thrown by the 
close call.
-     * This method must NEVER throw any Throwable
-     *
-     * @param indexHelper
-     *            the indexHelper to close
-     * @param root
-     *            the first exception encountered during release of resources
-     * @return the root Throwable if not null or a new Throwable if any was 
thrown, otherwise, it returns null
-     */
-    public static Throwable close(ITupleForwarder tupleForwarder, Throwable 
root) {
-        if (tupleForwarder != null) {
-            try {
-                tupleForwarder.close();
-            } catch (Throwable th) { // NOSONAR Will be re-thrown
-                try {
-                    LOGGER.log(Level.WARN, "Failure closing a closeable 
resource", th);
-                } catch (Throwable ignore) { // NOSONAR Logging exception will 
be ignored
-                    // NOSONAR ignore
-                }
-                root = ExceptionUtils.suppress(root, th);
-            }
-        }
-        return root;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 9d887b6..8bafb0f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -20,6 +20,7 @@
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -35,7 +36,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FeedLogManager {
+public class FeedLogManager implements Closeable {
 
     public enum LogEntryType {
         START, // partition start
@@ -64,7 +65,7 @@
     public FeedLogManager(File file) throws HyracksDataException {
         try {
             this.dir = file.toPath();
-            this.completed = new TreeSet<String>();
+            this.completed = new TreeSet<>();
             if (!exists()) {
                 create();
             }
@@ -122,6 +123,7 @@
                 StandardCharsets.UTF_8, StandardOpenOption.APPEND);
     }
 
+    @Override
     public synchronized void close() throws IOException {
         count--;
         if (count > 0) {
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 6fe938c..8ee8a57 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -28,11 +28,8 @@
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
-import org.apache.asterix.external.api.ITupleForwarder;
+import org.apache.asterix.external.dataflow.TupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
-import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -79,26 +76,19 @@
             @Override
             public ITupleParser createTupleParser(IHyracksTaskContext ctx) 
throws HyracksDataException {
                 ADMDataParser parser;
-                ITupleForwarder forwarder;
+
                 ArrayTupleBuilder tb;
                 IApplicationContext appCtx =
                         (IApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
                 ClusterPartition nodePartition = 
appCtx.getMetadataProperties().getNodePartitions().get(nodeId)[0];
                 parser = new ADMDataParser(outputType, true);
-                forwarder =
-                        DataflowUtils
-                                .getTupleForwarder(configuration,
-                                        FeedUtils.getFeedLogManager(ctx, 
FeedUtils.splitsForAdapter(
-                                                
ExternalDataUtils.getDataverse(configuration),
-                                                
ExternalDataUtils.getFeedName(configuration), nodeId, nodePartition)));
                 tb = new ArrayTupleBuilder(1);
                 return new ITupleParser() {
-
                     @Override
                     public void parse(InputStream in, IFrameWriter writer) 
throws HyracksDataException {
                         try {
                             parser.setInputStream(in);
-                            forwarder.initialize(ctx, writer);
+                            TupleForwarder forwarder = new TupleForwarder(ctx, 
writer);
                             while (true) {
                                 tb.reset();
                                 if (!parser.parse(tb.getDataOutput())) {
@@ -107,7 +97,7 @@
                                 tb.addFieldEndOffset();
                                 forwarder.addTuple(tb);
                             }
-                            forwarder.close();
+                            forwarder.complete();
                         } catch (Exception e) {
                             throw HyracksDataException.create(e);
                         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
index bf54e01..a116320 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -107,11 +107,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        writer.close();
-        downstreamOpen = false;
         if (downstreamFailed && !failCalledByUpstream) {
             throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL);
         }
+        writer.close();
+        downstreamOpen = false;
     }
 
     public static IFrameWriter enforce(IFrameWriter writer) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2463
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ife679fb9643dc6b39d035e0eecdb915b227503a5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to