abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2428

Change subject: [NO ISSUE][ING] Close and return on interrupt of ingestion
......................................................................

[NO ISSUE][ING] Close and return on interrupt of ingestion

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Previously, we would still attempt to write through the network
  in case an ingestion task is interrupted.
- The goal was to try and get as much data in as possible but in
  the case where the cluster was in a bad state, this could
  lead to hanging threads.
- After this change, each record reader must implement the stop
  method correctly to allow for graceful stop while interrupts
  will always mean abort the task and return as soon as possible.

Change-Id: I6119617d133fb161a48b39f9812ec79e0189975b
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
4 files changed, 9 insertions(+), 18 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/28/2428/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 1a83603..04054f1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -250,7 +250,6 @@
     public static final int ACTIVE_RUNTIME_IS_ALREADY_REGISTERED = 3105;
     public static final int ACTIVE_RUNTIME_IS_NOT_REGISTERED = 3106;
     public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
-    public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
     public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
     public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
     public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 995b541..1fb8480 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -239,7 +239,6 @@
 3105 = %1$s is already registered
 3106 = %1$s is not registered
 3107 = Active Notification Handler is already suspended
-3108 = Feed stopped while waiting for a new record
 3109 = Function %1$s is being used. It cannot be dropped
 3110 = Feed failed while reading a new record
 3111 = Feed %1$s is not connected to any dataset
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 306a2a5..9826be7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -95,11 +95,7 @@
             }
         } catch (HyracksDataException e) {
             LOGGER.log(Level.WARN, "Exception during ingestion", e);
-            //if interrupted while waiting for a new record, then it is safe 
to not fail forward
             if (e.getComponent() == ErrorCode.ASTERIX
-                    && (e.getErrorCode() == 
ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) {
-                // Do nothing. interrupted by the active manager
-            } else if (e.getComponent() == ErrorCode.ASTERIX
                     && (e.getErrorCode() == 
ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
                 // Failure but we know we can for sure push the previously 
parsed records safely
                 failure = e;
@@ -141,11 +137,8 @@
     private IRawRecord<? extends T> next() throws Exception {
         try {
             return recordReader.next();
-        } catch (InterruptedException e) { // NOSONAR Gracefully handling 
interrupt to push records in the pipeline
-            if (flushing) {
-                throw e;
-            }
-            throw new 
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+        } catch (InterruptedException e) {
+            throw e;
         } catch (Exception e) {
             if (flushing) {
                 throw e;
@@ -161,11 +154,8 @@
         while (true) {
             try {
                 return recordReader.hasNext();
-            } catch (InterruptedException e) { // NOSONAR Gracefully handling 
interrupt to push records in the pipeline
-                if (flushing) {
-                    throw e;
-                }
-                throw new 
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+            } catch (InterruptedException e) {
+                throw e;
             } catch (Exception e) {
                 if (flushing) {
                     throw e;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 5a7b4b9..bb9d8c9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 import twitter4j.Query;
 import twitter4j.QueryResult;
 import twitter4j.Status;
@@ -43,6 +44,7 @@
     private int nextTweetIndex = 0;
     private long lastTweetIdReceived = 0;
     private GenericRecord<String> record;
+    private boolean stopped = false;
 
     public TwitterPullRecordReader(Twitter twitter, String keywords, int 
requestInterval) {
         this.twitter = twitter;
@@ -59,7 +61,7 @@
 
     @Override
     public boolean hasNext() throws Exception {
-        return true;
+        return !stopped;
     }
 
     @Override
@@ -90,7 +92,8 @@
 
     @Override
     public boolean stop() {
-        return false;
+        stopped = true;
+        return true;
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2428
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6119617d133fb161a48b39f9812ec79e0189975b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to