http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 5b9b96f..e24c26d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -32,16 +34,20 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowController {
+    public enum State {
+        CREATED,
+        STARTED,
+        STOPPED
+    }
+
     private static final Logger LOGGER = 
Logger.getLogger(FeedRecordDataFlowController.class.getName());
     private final IRecordDataParser<T> dataParser;
     private final IRecordReader<T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected static final long INTERVAL = 1000;
-    protected boolean failed = false;
+    protected State state = State.CREATED;
     protected long incomingRecordsCount = 0;
     protected long failedRecordsCount = 0;
 
@@ -57,8 +63,15 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException, 
InterruptedException {
+        synchronized (this) {
+            if (state == State.STOPPED) {
+                return;
+            } else {
+                setState(State.STARTED);
+            }
+        }
+        Exception failure = null;
         try {
-            failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (hasNext()) {
                 IRawRecord<? extends T> record = next();
@@ -74,21 +87,48 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
                 }
             }
         } catch (HyracksDataException e) {
-            LOGGER.log(Level.WARN, e);
+            LOGGER.log(Level.WARNING, "Exception during ingestion", e);
             //if interrupted while waiting for a new record, then it is safe 
to not fail forward
             if (e.getComponent() == ErrorCode.ASTERIX
-                    && e.getErrorCode() == 
ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD) {
-                // Do nothing
+                    && (e.getErrorCode() == 
ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) {
+                // Do nothing. interrupted by the active manager
+            } else if (e.getComponent() == ErrorCode.ASTERIX
+                    && (e.getErrorCode() == 
ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
+                // Failure but we know we can for sure push the previously 
parsed records safely
+                failure = e;
+                try {
+                    flush();
+                } catch (Exception flushException) {
+                    tupleForwarder.fail();
+                    flushException.addSuppressed(e);
+                    failure = flushException;
+                }
             } else {
-                failed = true;
-                throw e;
+                failure = e;
+                tupleForwarder.fail();
             }
         } catch (Exception e) {
-            failed = true;
-            LOGGER.warn("Failure while operating a feed source", e);
-            throw HyracksDataException.create(e);
+            failure = e;
+            tupleForwarder.fail();
+            LOGGER.log(Level.WARNING, "Failure while operating a feed source", 
e);
+        } finally {
+            failure = finish(failure);
+        }
+        if (failure != null) {
+            if (failure instanceof InterruptedException) {
+                throw (InterruptedException) failure;
+            }
+            throw HyracksDataException.create(failure);
         }
-        finish();
+    }
+
+    private synchronized void setState(State newState) {
+        LOGGER.log(Level.INFO, "State is being set from " + state + " to " + 
newState);
+        state = newState;
+    }
+
+    public synchronized State getState() {
+        return state;
     }
 
     private IRawRecord<? extends T> next() throws HyracksDataException {
@@ -97,47 +137,58 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
         } catch (InterruptedException e) { // NOSONAR Gracefully handling 
interrupt to push records in the pipeline
             throw new 
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
         } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            if (!recordReader.handleException(e)) {
+                throw new 
RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
+            }
+            return null;
         }
     }
 
     private boolean hasNext() throws HyracksDataException {
-        boolean hasNext;
-        try {
-            hasNext = recordReader.hasNext();
-        } catch (InterruptedException e) { // NOSONAR Gracefully handling 
interrupt to push records in the pipeline
-            throw new 
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        while (true) {
+            try {
+                return recordReader.hasNext();
+            } catch (InterruptedException e) { // NOSONAR Gracefully handling 
interrupt to push records in the pipeline
+                throw new 
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+            } catch (Exception e) {
+                if (!recordReader.handleException(e)) {
+                    throw new 
RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
+                }
+            }
         }
-        return hasNext;
     }
 
-    private void finish() throws HyracksDataException {
+    private Exception finish(Exception failure) {
         HyracksDataException hde = null;
         try {
-            tupleForwarder.close();
-        } catch (Throwable th) {
+            recordReader.close();
+        } catch (Exception th) {
+            LOGGER.log(Level.WARNING, "Failure during while operating a feed 
source", th);
             hde = HyracksDataException.suppress(hde, th);
         }
         try {
-            recordReader.close();
-        } catch (Throwable th) {
-            LOGGER.warn("Failure during while operating a feed sourcec", th);
+            tupleForwarder.close();
+        } catch (Exception th) {
             hde = HyracksDataException.suppress(hde, th);
         } finally {
             closeSignal();
         }
+        setState(State.STOPPED);
         if (hde != null) {
-            throw hde;
+            if (failure != null) {
+                failure.addSuppressed(hde);
+            } else {
+                return hde;
+            }
         }
+        return failure;
     }
 
     private boolean parseAndForward(IRawRecord<? extends T> record) throws 
IOException {
         try {
             dataParser.parse(record, tb.getDataOutput());
         } catch (Exception e) {
-            LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+            LOGGER.log(Level.WARNING, 
ExternalDataConstants.ERROR_PARSE_RECORD, e);
             feedLogManager.logRecord(record.toString(), 
ExternalDataConstants.ERROR_PARSE_RECORD);
             // continue the outer loop
             return false;
@@ -172,44 +223,31 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
 
     @Override
     public boolean stop() throws HyracksDataException {
-        HyracksDataException hde = null;
+        synchronized (this) {
+            switch (state) {
+                case CREATED:
+                case STOPPED:
+                    setState(State.STOPPED);
+                    return true;
+                case STARTED:
+                    break;
+                default:
+                    throw new HyracksDataException("unknown state " + state);
+
+            }
+        }
         if (recordReader.stop()) {
-            if (failed) {
-                // failed, close here
-                try {
-                    tupleForwarder.close();
-                } catch (Throwable th) {
-                    hde = HyracksDataException.suppress(hde, th);
-                }
-                try {
-                    recordReader.close();
-                } catch (Throwable th) {
-                    hde = HyracksDataException.suppress(hde, th);
-                }
-                if (hde != null) {
-                    throw hde;
-                }
-            } else {
-                try {
-                    waitForSignal();
-                } catch (InterruptedException e) {
-                    throw HyracksDataException.create(e);
-                }
+            try {
+                waitForSignal();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
             }
             return true;
         }
         return false;
     }
 
-    @Override
-    public boolean handleException(Throwable th) throws HyracksDataException {
-        // This is not a parser record. most likely, this error happened in 
the record reader.
-        if (!recordReader.handleException(th)) {
-            finish();
-        }
-        return !closed.get();
-    }
-
     public IRecordReader<T> getReader() {
         return recordReader;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index cad11cd..1f1f545 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -43,8 +43,7 @@ public class FeedStreamDataFlowController extends 
AbstractFeedDataFlowController
         try {
             tupleForwarder.initialize(ctx, writer);
             while (true) {
-                tb.reset();
-                if (!dataParser.parse(tb.getDataOutput())) {
+                if (!parseNext()) {
                     break;
                 }
                 tb.addFieldEndOffset();
@@ -52,12 +51,25 @@ public class FeedStreamDataFlowController extends 
AbstractFeedDataFlowController
                 incomingRecordsCount++;
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         } finally {
             tupleForwarder.close();
         }
     }
 
+    private boolean parseNext() throws HyracksDataException {
+        while (true) {
+            try {
+                tb.reset();
+                return dataParser.parse(tb.getDataOutput());
+            } catch (Exception e) {
+                if (!handleException(e)) {
+                    throw e;
+                }
+            }
+        }
+    }
+
     @Override
     public boolean stop() throws HyracksDataException {
         try {
@@ -71,8 +83,7 @@ public class FeedStreamDataFlowController extends 
AbstractFeedDataFlowController
         return false;
     }
 
-    @Override
-    public boolean handleException(Throwable th) {
+    private boolean handleException(Throwable th) {
         boolean handled = true;
         try {
             handled &= stream.handleException(th);
@@ -86,6 +97,7 @@ public class FeedStreamDataFlowController extends 
AbstractFeedDataFlowController
         return handled;
     }
 
+    @Override
     public String getStats() {
         return "{\"incoming-records-number\": " + incomingRecordsCount + "}";
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 3a8130b..f824b67 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -103,7 +103,7 @@ public class FeedTupleForwarder implements ITupleForwarder {
                 if (throwable != null) {
                     throwable.addSuppressed(e);
                 } else {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             } catch (Throwable th) {
                 if (throwable != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index 2b06775..c4f75e3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -32,8 +32,8 @@ public class IndexingDataFlowController<T> extends 
RecordDataFlowController<T> {
     private final IExternalIndexer indexer;
 
     public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder 
tupleForwarder,
-            IRecordDataParser<T> dataParser, IRecordReader<? extends T> 
recordReader,
-            IExternalIndexer indexer) throws IOException {
+            IRecordDataParser<T> dataParser, IRecordReader<? extends T> 
recordReader, IExternalIndexer indexer)
+            throws IOException {
         super(ctx, tupleForwarder, dataParser, recordReader, 1 + 
indexer.getNumberOfFields());
         this.indexer = indexer;
     }
@@ -43,7 +43,7 @@ public class IndexingDataFlowController<T> extends 
RecordDataFlowController<T> {
         try {
             indexer.index(tb);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index eb5527f..f34b77d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -71,7 +71,8 @@ public class RateControlledTupleForwarder implements 
ITupleForwarder {
             try {
                 Thread.sleep(interTupleInterval);
             } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
             }
         }
         boolean success = appender.append(tb.getFieldEndOffsets(), 
tb.getByteArray(), 0, tb.getSize());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 9f32a25..eeda80c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -24,7 +24,6 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class FeedAdapter implements IDataSourceAdapter {
-    private static final long serialVersionUID = 1L;
     private final AbstractFeedDataFlowController controller;
 
     public FeedAdapter(AbstractFeedDataFlowController controller) {
@@ -40,10 +39,6 @@ public class FeedAdapter implements IDataSourceAdapter {
         return controller.stop();
     }
 
-    public boolean handleException(Throwable e) throws HyracksDataException {
-        return controller.handleException(e);
-    }
-
     public boolean pause() throws HyracksDataException {
         return controller.pause();
     }
@@ -56,7 +51,7 @@ public class FeedAdapter implements IDataSourceAdapter {
         return controller.getStats();
     }
 
-    public void fail() throws HyracksDataException {
-        controller.fail();
+    public AbstractFeedDataFlowController getController() {
+        return controller;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 0681d71..916fe0a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -25,7 +25,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class GenericAdapter implements IDataSourceAdapter {
 
-    private static final long serialVersionUID = 1L;
     private final IDataFlowController controller;
 
     public GenericAdapter(IDataFlowController controller) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
deleted file mode 100644
index d102d0c..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-/**
- * The class in charge of executing feed adapters.
- */
-public class AdapterExecutor implements Runnable {
-
-    private static final Logger LOGGER = 
Logger.getLogger(AdapterExecutor.class.getName());
-
-    private final IFrameWriter writer; // A writer that sends frames to 
multiple receivers (that can
-    // increase or decrease at any time)
-    private final FeedAdapter adapter; // The adapter
-    private final AdapterRuntimeManager adapterManager;// The runtime manager 
<-- two way visibility -->
-    private int restartCount = 0;
-
-    public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter, 
AdapterRuntimeManager adapterManager) {
-        this.writer = writer;
-        this.adapter = adapter;
-        this.adapterManager = adapterManager;
-    }
-
-    @Override
-    public void run() {
-        // Start by getting the partition number from the manager
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Starting ingestion for partition:" + 
adapterManager.getPartition());
-        }
-        boolean failed = false;
-        try {
-            failed = doRun();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            failed = true;
-            LOGGER.error("Unhandled Exception", e);
-        } finally {
-            // Done with the adapter. about to close, setting the stage based 
on the failed ingestion flag and notifying
-            // the runtime manager
-            adapterManager.setFailed(failed);
-            adapterManager.setDone(true);
-            synchronized (adapterManager) {
-                adapterManager.notifyAll();
-            }
-        }
-    }
-
-    private boolean doRun() throws HyracksDataException, InterruptedException {
-        boolean continueIngestion = true;
-        boolean failedIngestion = false;
-        while (continueIngestion) {
-            try {
-                // Start the adapter
-                adapter.start(adapterManager.getPartition(), writer);
-                // Adapter has completed execution
-                continueIngestion = false;
-            } catch (InterruptedException e) {
-                adapter.fail();
-                throw e;
-            } catch (Exception e) {
-                LOGGER.error("Exception during feed ingestion ", e);
-                continueIngestion = adapter.handleException(e);
-                if (!continueIngestion) {
-                    adapter.fail();
-                }
-                failedIngestion = !continueIngestion;
-                restartCount++;
-            }
-        }
-        return failedIngestion;
-    }
-
-    public String getStats() {
-        return "{\"adapter-stats\": " + adapter.getStats() + ", 
\"executor-restart-times\": " + restartCount + "}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
deleted file mode 100644
index 1b5eeac..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * This class manages the execution of an adapter within a feed
- */
-public class AdapterRuntimeManager {
-
-    private static final Logger LOGGER = 
Logger.getLogger(AdapterRuntimeManager.class.getName());
-
-    private final EntityId feedId; // (dataverse-feed)
-
-    private final FeedAdapter feedAdapter; // The adapter
-
-    private final AdapterExecutor adapterExecutor; // The executor for the 
adapter
-
-    private final int partition; // The partition number
-
-    private final IHyracksTaskContext ctx;
-
-    private Future<?> execution;
-
-    private boolean started = false;
-    private volatile boolean done = false;
-    private volatile boolean failed = false;
-
-    public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, 
FeedAdapter feedAdapter,
-            IFrameWriter writer, int partition) {
-        this.ctx = ctx;
-        this.feedId = entityId;
-        this.feedAdapter = feedAdapter;
-        this.partition = partition;
-        this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
-    }
-
-    public void start() {
-        synchronized (adapterExecutor) {
-            started = true;
-            if (!done) {
-                execution = ctx.getExecutorService().submit(adapterExecutor);
-            } else {
-                LOGGER.log(Level.WARNING, "Someone stopped me before I even 
start. I will simply not start");
-            }
-        }
-    }
-
-    public void stop() throws HyracksDataException, InterruptedException {
-        synchronized (adapterExecutor) {
-            try {
-                if (started) {
-                    try {
-                        ctx.getExecutorService().submit(() -> {
-                            if (feedAdapter.stop()) {
-                                execution.get();
-                            }
-                            return null;
-                        }).get(30, TimeUnit.SECONDS);
-                    } catch (InterruptedException e) {
-                        LOGGER.log(Level.WARNING, "Interrupted while trying to 
stop an adapter runtime", e);
-                        throw e;
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARNING, "Exception while trying to 
stop an adapter runtime", e);
-                        throw HyracksDataException.create(e);
-                    } finally {
-                        execution.cancel(true);
-                    }
-                } else {
-                    LOGGER.log(Level.WARNING, "Adapter executor was stopped 
before it starts");
-                }
-            } finally {
-                done = true;
-            }
-        }
-    }
-
-    public EntityId getFeedId() {
-        return feedId;
-    }
-
-    @Override
-    public String toString() {
-        return feedId + "[" + partition + "]";
-    }
-
-    public FeedAdapter getFeedAdapter() {
-        return feedAdapter;
-    }
-
-    public AdapterExecutor getAdapterExecutor() {
-        return adapterExecutor;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public boolean isFailed() {
-        return failed;
-    }
-
-    public void setFailed(boolean failed) {
-        this.failed = failed;
-    }
-
-    public boolean isDone() {
-        return done;
-    }
-
-    public void setDone(boolean done) {
-        this.done = done;
-    }
-
-    public String getStats() {
-        return adapterExecutor.getStats();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index cbf784e..982cf5b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -137,7 +137,7 @@ public class HDFSDataSourceFactory implements 
IRecordReaderFactory<Object>, IInd
             restoreConfig(ctx);
             return new HDFSInputStream(read, inputSplits, readSchedule, 
nodeName, conf, configuration, files, indexer);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00ac090..c5ca129 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -21,16 +21,17 @@ package org.apache.asterix.external.input.stream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
 
 public class LocalFSInputStream extends AsterixInputStream {
 
@@ -155,24 +156,25 @@ public class LocalFSInputStream extends 
AsterixInputStream {
         if (in == null) {
             return false;
         }
-        if (th instanceof HyracksDataException
-                && ((HyracksDataException) th).getErrorCode() == 
ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
+        Throwable root = ExceptionUtils.getRootCause(th);
+        if (root instanceof HyracksDataException
+                && ((HyracksDataException) root).getErrorCode() == 
ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
             if (currentFile != null) {
                 try {
                     logManager.logRecord(currentFile.getAbsolutePath(), 
"Corrupted input file");
                 } catch (IOException e) {
-                    LOGGER.warn("Filed to write to feed log file", e);
+                    LOGGER.log(Level.WARNING, "Filed to write to feed log 
file", e);
                 }
-                LOGGER.warn("Corrupted input file: " + 
currentFile.getAbsolutePath());
+                LOGGER.log(Level.WARNING, "Corrupted input file: " + 
currentFile.getAbsolutePath());
             }
             try {
                 advance();
                 return true;
             } catch (Exception e) {
-                LOGGER.warn("An exception was thrown while trying to skip a 
file", e);
+                LOGGER.log(Level.WARNING, "An exception was thrown while 
trying to skip a file", e);
             }
         }
-        LOGGER.warn("Failed to recover from failure", th);
+        LOGGER.log(Level.WARNING, "Failed to recover from failure", th);
         return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index 9a0e718..caeaa07 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -84,7 +84,7 @@ public class SocketClientInputStreamFactory implements 
IInputStreamFactory {
         try {
             return new SocketClientInputStream(sockets.get(partition));
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 05931b2..1f1fa5c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -124,7 +124,7 @@ public class SocketServerInputStreamFactory implements 
IInputStreamFactory {
             server.bind(new InetSocketAddress(socket.second));
             return new SocketServerInputStream(server);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index b32006c..12be449 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -97,7 +97,7 @@ public class TwitterFirehoseStreamFactory implements 
IInputStreamFactory {
         try {
             return new TwitterFirehoseInputStream(configuration, partition);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 0d485924..2b5e248 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -23,6 +23,8 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -95,6 +97,11 @@ import org.apache.hyracks.util.string.UTF8StringReader;
 
 public class JObjectAccessors {
 
+    private static final Logger LOGGER = 
Logger.getLogger(JObjectAccessors.class.getName());
+
+    private JObjectAccessors() {
+    }
+
     public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag 
aTypeTag) {
         IJObjectAccessor accessor = null;
         switch (aTypeTag) {
@@ -200,18 +207,16 @@ public class JObjectAccessors {
         @Override
         public IJObject access(IVisitablePointable pointable, 
IObjectPool<IJObject, IAType> objPool)
                 throws HyracksDataException {
-            IJObject jObject = objPool.allocate(BuiltinType.ANULL);
-            return jObject;
+            return objPool.allocate(BuiltinType.ANULL);
         }
     }
 
-    public static class JMissingAccessor implements  IJObjectAccessor {
+    public static class JMissingAccessor implements IJObjectAccessor {
 
         @Override
         public IJObject access(IVisitablePointable pointable, 
IObjectPool<IJObject, IAType> objPool)
                 throws HyracksDataException {
-            IJObject jObject = objPool.allocate(BuiltinType.AMISSING);
-            return jObject;
+            return objPool.allocate(BuiltinType.AMISSING);
         }
     }
 
@@ -271,7 +276,7 @@ public class JObjectAccessors {
             try {
                 v = reader.readUTF(new DataInputStream(new 
ByteArrayInputStream(b, s + 1, l - 1)));
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
             JObjectUtil.getNormalizedString(v);
 
@@ -539,8 +544,8 @@ public class JObjectAccessors {
                 }
 
             } catch (Exception e) {
-                e.printStackTrace();
-                throw new HyracksDataException(e);
+                LOGGER.log(Level.WARNING, "Failure while accessing a java 
record", e);
+                throw HyracksDataException.create(e);
             }
             return jRecord;
         }
@@ -593,7 +598,7 @@ public class JObjectAccessors {
                     list.add(listItem);
                 }
             } catch (AsterixException exception) {
-                throw new HyracksDataException(exception);
+                throw HyracksDataException.create(exception);
             }
             return list;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 45d424e..242773e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -89,7 +89,7 @@ public class ExternalIndexBulkModifyOperatorNodePushable 
extends IndexBulkLoadOp
             try {
                 bulkLoader.end();
             } catch (Throwable th) {
-                throw new HyracksDataException(th);
+                throw HyracksDataException.create(th);
             } finally {
                 try {
                     indexHelper.close();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 6299982..c096f69 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -89,18 +89,21 @@ public class ExternalLookupOperatorDescriptor extends 
AbstractSingleActivityOper
                     try {
                         snapshotAccessor.close();
                     } catch (Throwable th) {
-                        hde = new HyracksDataException(th);
+                        hde = HyracksDataException.create(th);
                     }
                     try {
                         adapter.close();
                     } catch (Throwable th) {
                         if (hde == null) {
-                            hde = new HyracksDataException(th);
+                            hde = HyracksDataException.create(th);
                         } else {
                             hde.addSuppressed(th);
                         }
                     }
                 }
+                if (hde != null) {
+                    throw hde;
+                }
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 3a06a2b..770e978 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -96,8 +96,7 @@ public class FeedIntakeOperatorDescriptor extends 
AbstractSingleActivityOperator
         if (adaptorFactory == null) {
             adaptorFactory = createExternalAdapterFactory(ctx);
         }
-        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, 
partition, policyAccessor,
-                recordDescProvider, this);
+        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, 
partition, recordDescProvider, this);
     }
 
     private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext 
ctx) throws HyracksDataException {
@@ -112,7 +111,7 @@ public class FeedIntakeOperatorDescriptor extends 
AbstractSingleActivityOperator
                 adapterFactory.setOutputType(adapterOutputType);
                 
adapterFactory.configure(ctx.getJobletContext().getServiceContext(), 
adaptorConfiguration);
             } catch (Exception e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
         } else {
             RuntimeDataException err = new RuntimeDataException(

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8c6a420..16b8fba 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,15 +18,14 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -42,74 +41,97 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil;
  * The artifacts are lazily activated when a feed receives a subscription 
request.
  */
 public class FeedIntakeOperatorNodePushable extends 
ActiveSourceOperatorNodePushable {
-
-    private final int partition;
-    private final IAdapterFactory adapterFactory;
+    private static final Logger LOGGER = 
Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
     private final FeedIntakeOperatorDescriptor opDesc;
-    private volatile AdapterRuntimeManager adapterRuntimeManager;
+    private final FeedAdapter adapter;
+    private boolean poisoned = false;
 
     public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId 
feedId, IAdapterFactory adapterFactory,
-            int partition, FeedPolicyAccessor policyAccessor, 
IRecordDescriptorProvider recordDescProvider,
-            FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
+            int partition, IRecordDescriptorProvider recordDescProvider,
+            FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws 
HyracksDataException {
         super(ctx, new ActiveRuntimeId(feedId, 
FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         this.opDesc = feedIntakeOperatorDescriptor;
         this.recordDesc = 
recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0);
-        this.partition = partition;
-        this.adapterFactory = adapterFactory;
+        adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, 
runtimeId.getPartition());
     }
 
     @Override
     protected void start() throws HyracksDataException, InterruptedException {
+        String before = Thread.currentThread().getName();
+        Thread.currentThread().setName("Intake Thread");
         try {
             writer.open();
-            Thread.currentThread().setName("Intake Thread");
-            FeedAdapter adapter = (FeedAdapter) 
adapterFactory.createAdapter(ctx, partition);
-            adapterRuntimeManager = new AdapterRuntimeManager(ctx, 
runtimeId.getEntityId(), adapter, writer, partition);
-            IFrame message = new VSizeFrame(ctx);
-            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+            synchronized (this) {
+                if (poisoned) {
+                    return;
+                }
+            }
             /*
              * Set null feed message. Feed pipeline carries with it a message 
with each frame
              * Initially, the message is set to a null message that can be 
changed by feed adapters.
              * One use case is adapters which consume data sources that allow 
restartability. Such adapters
              * can propagate progress information through the ingestion 
pipeline to storage nodes
              */
+            IFrame message = new VSizeFrame(ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
-            adapterRuntimeManager.start();
-            synchronized (adapterRuntimeManager) {
-                while (!adapterRuntimeManager.isDone()) {
-                    adapterRuntimeManager.wait();
-                }
-            }
-            if (adapterRuntimeManager.isFailed()) {
-                throw new RuntimeDataException(
-                        
ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
-            }
+            run();
         } catch (Exception e) {
-            /*
-             * An Interrupted Exception is thrown if the Intake job cannot 
progress further due to failure of another
-             * node involved in the Hyracks job. As the Intake job involves 
only the intake operator, the exception is
-             * indicative of a failure at the sibling intake operator 
location. The surviving intake partitions must
-             * continue to live and receive data from the external source.
-             */
-            writer.fail();
+            LOGGER.log(Level.WARNING, "Failure during data ingestion", e);
             throw e;
         } finally {
             writer.close();
+            Thread.currentThread().setName(before);
+        }
+    }
+
+    private void run() throws HyracksDataException {
+        // Start by getting the partition number from the manager
+        LOGGER.info("Starting ingestion for partition:" + 
ctx.getTaskAttemptId().getTaskId().getPartition());
+        try {
+            doRun();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw HyracksDataException.create(e);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Unhandled Exception", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void doRun() throws HyracksDataException, InterruptedException {
+        while (true) {
+            try {
+                // Start the adapter
+                
adapter.start(ctx.getTaskAttemptId().getTaskId().getPartition(), writer);
+                // Adapter has completed execution
+                return;
+            } catch (InterruptedException e) {
+                throw e;
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Exception during feed ingestion ", 
e);
+                throw HyracksDataException.create(e);
+            }
         }
     }
 
     @Override
     protected void abort() throws HyracksDataException, InterruptedException {
-        if (adapterRuntimeManager != null) {
-            adapterRuntimeManager.stop();
+        LOGGER.info(runtimeId + " aborting...");
+        synchronized (this) {
+            poisoned = true;
+            if (!adapter.stop()) {
+                LOGGER.info(runtimeId + " failed to stop adapter. interrupting 
the thread...");
+                taskThread.interrupt();
+            }
         }
     }
 
     @Override
     public String getStats() {
-        if (adapterRuntimeManager != null) {
-            return adapterRuntimeManager.getStats();
+        if (adapter != null) {
+            return "{\"adapter-stats\": " + adapter.getStats() + "}";
         } else {
             return "\"Runtime stats is not available.\"";
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
index e965dce..5fc9df3 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
@@ -1849,7 +1849,7 @@ public class BuiltinClassAdFunctions {
             result.setBooleanValue(false);
 
             List<String> list0 = objectPool.stringArrayListPool.get();
-            Set<String> set1 = new HashSet<String>();
+            Set<String> set1 = new HashSet<>();
 
             split_string_list(str0, have_delimiter ? 
delimiter_string.charAt(0) : ',', list0);
             split_string_set(str1, have_delimiter ? delimiter_string.charAt(0) 
: ',', set1);
@@ -1943,7 +1943,7 @@ public class BuiltinClassAdFunctions {
                     return true;
                 }
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
             state.decrementDepth();
             expr.setParentScope(state.getCurAd());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index c362969..2273bea 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -39,8 +39,6 @@ import 
org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
 public class TestTypedAdapter extends FeedAdapter {
 
-    private static final long serialVersionUID = 1L;
-
     private final PipedOutputStream pos;
 
     private final PipedInputStream pis;
@@ -145,11 +143,6 @@ public class TestTypedAdapter extends FeedAdapter {
     }
 
     @Override
-    public boolean handleException(Throwable e) {
-        return false;
-    }
-
-    @Override
     public boolean pause() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 1c28940..5262e1f 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -106,7 +106,7 @@ public class TestTypedAdapterFactory implements 
IAdapterFactory {
                             }
                             forwarder.close();
                         } catch (Exception e) {
-                            throw new HyracksDataException(e);
+                            throw HyracksDataException.create(e);
                         }
                     }
                 };
@@ -115,7 +115,7 @@ public class TestTypedAdapterFactory implements 
IAdapterFactory {
         try {
             return new TestTypedAdapter(tupleParserFactory, outputType, ctx, 
configuration, partition);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
index f64206e..f15b1e5 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
@@ -95,14 +95,14 @@ public abstract class AbstractListBuilder implements 
IAsterixListBuilder {
                 this.outputStream.write(data, start + 1, len - 1);
             }
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
     private boolean toWriteTag(byte serializedTypeTag) {
         boolean toWriteTag = itemTypeTag == ATypeTag.ANY;
-        toWriteTag = toWriteTag
-                || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == 
ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+        toWriteTag =
+                toWriteTag || (itemTypeTag == ATypeTag.NULL && 
serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG);
         return toWriteTag
                 || (itemTypeTag == ATypeTag.MISSING && serializedTypeTag == 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
index 111557a..5df04f8 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
@@ -118,17 +118,17 @@ public class RecordAddFieldsDescriptor extends 
AbstractScalarFunctionDynamicDesc
                     private final RecordBuilder recordBuilder = new 
RecordBuilder();
                     private final RuntimeRecordTypeInfo requiredRecordTypeInfo 
= new RuntimeRecordTypeInfo();
 
-                    private final IBinaryHashFunction putHashFunc = 
ListItemBinaryHashFunctionFactory.INSTANCE
-                            .createBinaryHashFunction();
-                    private final IBinaryHashFunction getHashFunc = 
ListItemBinaryHashFunctionFactory.INSTANCE
-                            .createBinaryHashFunction();
+                    private final IBinaryHashFunction putHashFunc =
+                            
ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction();
+                    private final IBinaryHashFunction getHashFunc =
+                            
ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction();
                     private final BinaryEntry keyEntry = new BinaryEntry();
                     private final BinaryEntry valEntry = new BinaryEntry();
                     private final IVisitablePointable tempValReference = 
allocator.allocateEmpty();
-                    private final IBinaryComparator cmp = 
ListItemBinaryComparatorFactory.INSTANCE
-                            .createBinaryComparator();
-                    private BinaryHashMap hashMap = new 
BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc,
-                            getHashFunc, cmp);
+                    private final IBinaryComparator cmp =
+                            
ListItemBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+                    private BinaryHashMap hashMap =
+                            new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, 
putHashFunc, getHashFunc, cmp);
                     private ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
                     private DataOutput out = resultStorage.getDataOutput();
 
@@ -157,7 +157,6 @@ public class RecordAddFieldsDescriptor extends 
AbstractScalarFunctionDynamicDesc
                         vp0.set(argPtr0);
                         vp1.set(argPtr1);
 
-
                         ARecordVisitablePointable recordPointable = 
(ARecordVisitablePointable) vp0;
                         AListVisitablePointable listPointable = 
(AListVisitablePointable) vp1;
 
@@ -207,10 +206,10 @@ public class RecordAddFieldsDescriptor extends 
AbstractScalarFunctionDynamicDesc
                                     throw new AsterixException("Expected list 
of record, got "
                                             + 
PointableHelper.getTypeTag(inputFields.get(i)));
                                 }
-                                List<IVisitablePointable> names = 
((ARecordVisitablePointable) inputFields.get(i))
-                                        .getFieldNames();
-                                List<IVisitablePointable> values = 
((ARecordVisitablePointable) inputFields.get(i))
-                                        .getFieldValues();
+                                List<IVisitablePointable> names =
+                                        ((ARecordVisitablePointable) 
inputFields.get(i)).getFieldNames();
+                                List<IVisitablePointable> values =
+                                        ((ARecordVisitablePointable) 
inputFields.get(i)).getFieldValues();
 
                                 // Get name and value of the field to be added
                                 // Use loop to account for the cases where 
users switches the order of the fields
@@ -241,8 +240,7 @@ public class RecordAddFieldsDescriptor extends 
AbstractScalarFunctionDynamicDesc
                                     tempValReference.set(entry.getBuf(), 
entry.getOffset(), entry.getLength());
                                     // If value is not equal throw conflicting 
duplicate field, otherwise ignore
                                     if 
(!PointableHelper.byteArrayEqual(valuePointable, tempValReference)) {
-                                        throw new 
RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME,
-                                                getIdentifier());
+                                        throw new 
RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
                                     }
                                 } else {
                                     if (pos > -1) {
@@ -256,7 +254,7 @@ public class RecordAddFieldsDescriptor extends 
AbstractScalarFunctionDynamicDesc
                                 }
                             }
                         } catch (AsterixException e) {
-                            throw new HyracksDataException(e);
+                            throw HyracksDataException.create(e);
                         }
                     }
                 };

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 3957c06..f2deb74 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -72,8 +72,8 @@ public class FlushDatasetOperatorDescriptor extends 
AbstractSingleActivityOperat
             @Override
             public void close() throws HyracksDataException {
                 try {
-                    INcApplicationContext appCtx = (INcApplicationContext) 
ctx.getJobletContext()
-                            .getServiceContext().getApplicationContext();
+                    INcApplicationContext appCtx =
+                            (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
                     IDatasetLifecycleManager datasetLifeCycleManager = 
appCtx.getDatasetLifecycleManager();
                     ILockManager lockManager = 
appCtx.getTransactionSubsystem().getLockManager();
                     ITransactionManager txnManager = 
appCtx.getTransactionSubsystem().getTransactionManager();
@@ -84,7 +84,7 @@ public class FlushDatasetOperatorDescriptor extends 
AbstractSingleActivityOperat
                     // flush the dataset synchronously
                     datasetLifeCycleManager.flushDataset(datasetId.getId(), 
false);
                 } catch (ACIDException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index b835b3a..b7a4c14 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -65,12 +65,12 @@ public class GlobalResourceIdFactory implements 
IResourceIdFactory {
                 ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToCC(msg);
                 reponse = resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
-                    throw new 
HyracksDataException(reponse.getException().getMessage());
+                    throw HyracksDataException.create(reponse.getException());
                 }
             }
             return reponse.getResourceId();
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index ab7d657..b22a257 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -395,10 +395,11 @@ public class ClusterStateManager implements 
IClusterStateManager {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
         }
-        if (!activeNcConfiguration.containsKey(nodeId)) {
+        if (activeNcConfiguration.containsKey(nodeId)) {
+            pendingRemoval.add(nodeId);
+        } else {
             LOGGER.warning("Cannot register unknown node " + nodeId + " for 
pending removal");
         }
-        pendingRemoval.add(nodeId);
     }
 
     public synchronized boolean cancelRemovePending(String nodeId) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index 367616e..f76cb89 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -46,8 +46,8 @@ public class LockThenSearchOperationCallback extends 
AbstractOperationCallback i
     private int pkHash;
 
     public LockThenSearchOperationCallback(DatasetId datasetId, int[] 
entityIdFields,
-            ITransactionSubsystem txnSubsystem,
-            ITransactionContext txnCtx, IOperatorNodePushable 
operatorNodePushable) {
+            ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx,
+            IOperatorNodePushable operatorNodePushable) {
         super(datasetId, entityIdFields, txnCtx, 
txnSubsystem.getLockManager());
         this.operatorNodePushable = 
(LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
         this.logManager = txnSubsystem.getLogManager();
@@ -118,7 +118,7 @@ public class LockThenSearchOperationCallback extends 
AbstractOperationCallback i
                 lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
             }
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index e8de90d..b13a08e 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -56,7 +56,7 @@ public class PrimaryIndexInstantSearchOperationCallback 
extends AbstractOperatio
         try {
             lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -70,7 +70,7 @@ public class PrimaryIndexInstantSearchOperationCallback 
extends AbstractOperatio
         try {
             lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -80,7 +80,7 @@ public class PrimaryIndexInstantSearchOperationCallback 
extends AbstractOperatio
         try {
             lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 9e96fbb..a6cb61c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -89,7 +89,7 @@ public class PrimaryIndexModificationOperationCallback 
extends AbstractIndexModi
                 lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
             }
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -99,7 +99,7 @@ public class PrimaryIndexModificationOperationCallback 
extends AbstractIndexModi
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
             log(pkHash, after, before);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 5527f47..932c925 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -74,7 +74,7 @@ public class PrimaryIndexModificationOperationCallbackFactory 
extends AbstractOp
             txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 8c5b099..b339d27 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -70,7 +70,7 @@ public class 
SecondaryIndexModificationOperationCallbackFactory extends Abstract
             txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index a27f987..f897aca 100644
--- 
a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ 
b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -20,11 +20,11 @@ package 
org.apache.asterix.transaction.management.service.locking;
 
 import static org.mockito.Mockito.mock;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.io.IIOManager;
@@ -35,11 +35,11 @@ import 
org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 class TestRuntimeContextProvider implements IAppRuntimeContextProvider {
 
-    ThreadExecutor ate = new ThreadExecutor(Executors.defaultThreadFactory());
+    ExecutorService ate = 
Executors.newCachedThreadPool(Executors.defaultThreadFactory());
     IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class);
 
     @Override
-    public ThreadExecutor getThreadExecutor() {
+    public ExecutorService getThreadExecutor() {
         return ate;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
 
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 8394057..067579e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.algebricks.common.constraints;
 
+import java.util.Arrays;
+
 public class AlgebricksAbsolutePartitionConstraint extends 
AlgebricksPartitionConstraint {
     private final String[] locations;
 
@@ -33,4 +35,10 @@ public class AlgebricksAbsolutePartitionConstraint extends 
AlgebricksPartitionCo
     public String[] getLocations() {
         return locations;
     }
+
+    @Override
+    public String toString() {
+        return Arrays.toString(locations);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
index 68274ce..2314f88 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -71,8 +71,8 @@ public class SplitOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma = new 
SplitterMaterializerActivityNode(
-                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        SplitterMaterializerActivityNode sma =
+                new SplitterMaterializerActivityNode(new ActivityId(odId, 
SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         for (int i = 0; i < outputArity; i++) {
@@ -168,7 +168,7 @@ public class SplitOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor
                                 writers[i].close();
                             } catch (Throwable th) {
                                 if (hde == null) {
-                                    hde = new HyracksDataException(th);
+                                    hde = HyracksDataException.create(th);
                                 } else {
                                     hde.addSuppressed(th);
                                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 024f6f5..82f403e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -137,8 +137,8 @@ public class StringStreamingRuntimeFactory extends 
AbstractOneInputOneOutputRunt
                     ForwardScriptOutput fso = new ForwardScriptOutput(parser, 
process.getInputStream());
                     outputPipe = new Thread(fso);
                     outputPipe.start();
-                    DumpInStreamToPrintStream disps = new 
DumpInStreamToPrintStream(process.getErrorStream(),
-                            System.err);
+                    DumpInStreamToPrintStream disps =
+                            new 
DumpInStreamToPrintStream(process.getErrorStream(), System.err);
                     dumpStderr = new Thread(disps);
                     dumpStderr.start();
                 } catch (IOException e) {
@@ -174,7 +174,8 @@ public class StringStreamingRuntimeFactory extends 
AbstractOneInputOneOutputRunt
                     outputPipe.join();
                     dumpStderr.join();
                 } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
                 }
                 if (ret != 0) {
                     throw new HyracksDataException("Process exit value: " + 
ret);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index f4c5114..0e70759 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -59,7 +59,7 @@ public class IoUtil {
     public static void delete(File file) throws HyracksDataException {
         try {
             if (file.isDirectory()) {
-                FileUtils.deleteDirectory(file);
+                deleteDirectory(file);
             } else {
                 Files.delete(file.toPath());
             }
@@ -89,4 +89,39 @@ public class IoUtil {
             throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_FILE, e, 
fileRef.getAbsolutePath());
         }
     }
+
+    public static void deleteDirectory(File directory) throws IOException {
+        if (!directory.exists()) {
+            return;
+        }
+        if (!FileUtils.isSymlink(directory)) {
+            cleanDirectory(directory);
+        }
+        Files.delete(directory.toPath());
+    }
+
+    public static void cleanDirectory(final File directory) throws IOException 
{
+        final File[] files = verifiedListFiles(directory);
+        for (final File file : files) {
+            delete(file);
+        }
+    }
+
+    private static File[] verifiedListFiles(File directory) throws IOException 
{
+        if (!directory.exists()) {
+            final String message = directory + " does not exist";
+            throw new IllegalArgumentException(message);
+        }
+
+        if (!directory.isDirectory()) {
+            final String message = directory + " is not a directory";
+            throw new IllegalArgumentException(message);
+        }
+
+        final File[] files = directory.listFiles();
+        if (files == null) { // null if security restricted
+            throw new IOException("Failed to list contents of " + directory);
+        }
+        return files;
+    }
 }

Reply via email to