abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2428
Change subject: [NO ISSUE][ING] Close and return on interrupt of ingestion
......................................................................
[NO ISSUE][ING] Close and return on interrupt of ingestion
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Previously, we would still attempt to write through the network
in case an ingestion task is interrupted.
- The goal was to try and get as much data in as possible but in
the case where the cluster was in a bad state, this could
lead to hanging threads.
- After this change, each record reader must implement the stop
method correctly to allow for graceful stop while interrupts
will always mean abort the task and return as soon as possible.
Change-Id: I6119617d133fb161a48b39f9812ec79e0189975b
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
4 files changed, 9 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/28/2428/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 1a83603..04054f1 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -250,7 +250,6 @@
public static final int ACTIVE_RUNTIME_IS_ALREADY_REGISTERED = 3105;
public static final int ACTIVE_RUNTIME_IS_NOT_REGISTERED = 3106;
public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
- public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 995b541..1fb8480 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -239,7 +239,6 @@
3105 = %1$s is already registered
3106 = %1$s is not registered
3107 = Active Notification Handler is already suspended
-3108 = Feed stopped while waiting for a new record
3109 = Function %1$s is being used. It cannot be dropped
3110 = Feed failed while reading a new record
3111 = Feed %1$s is not connected to any dataset
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 306a2a5..9826be7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -95,11 +95,7 @@
}
} catch (HyracksDataException e) {
LOGGER.log(Level.WARN, "Exception during ingestion", e);
- //if interrupted while waiting for a new record, then it is safe
to not fail forward
if (e.getComponent() == ErrorCode.ASTERIX
- && (e.getErrorCode() ==
ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) {
- // Do nothing. interrupted by the active manager
- } else if (e.getComponent() == ErrorCode.ASTERIX
&& (e.getErrorCode() ==
ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
// Failure but we know we can for sure push the previously
parsed records safely
failure = e;
@@ -141,11 +137,8 @@
private IRawRecord<? extends T> next() throws Exception {
try {
return recordReader.next();
- } catch (InterruptedException e) { // NOSONAR Gracefully handling
interrupt to push records in the pipeline
- if (flushing) {
- throw e;
- }
- throw new
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
if (flushing) {
throw e;
@@ -161,11 +154,8 @@
while (true) {
try {
return recordReader.hasNext();
- } catch (InterruptedException e) { // NOSONAR Gracefully handling
interrupt to push records in the pipeline
- if (flushing) {
- throw e;
- }
- throw new
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
if (flushing) {
throw e;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 5a7b4b9..bb9d8c9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Status;
@@ -43,6 +44,7 @@
private int nextTweetIndex = 0;
private long lastTweetIdReceived = 0;
private GenericRecord<String> record;
+ private boolean stopped = false;
public TwitterPullRecordReader(Twitter twitter, String keywords, int
requestInterval) {
this.twitter = twitter;
@@ -59,7 +61,7 @@
@Override
public boolean hasNext() throws Exception {
- return true;
+ return !stopped;
}
@Override
@@ -90,7 +92,8 @@
@Override
public boolean stop() {
- return false;
+ stopped = true;
+ return true;
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/2428
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6119617d133fb161a48b39f9812ec79e0189975b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>