abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1968
Change subject: [ASTERIXDB-2067][ING] Handle Failures in Controller Flush
......................................................................
[ASTERIXDB-2067][ING] Handle Failures in Controller Flush
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Failures that happen in feed while reading from external
sources allows ingestion pipeline to close gracefully
pushing parsed records in the frame forward before
failing.
- There was an assumption that when hasNext() or next()
are being called on a data reader and we fail, then
the failure didn't affect the integrity of the pipeline.
- This assumption is incorrect as hasNext() and next()
can themselves flush the pipeline and if the failure
happened during the flush call, the pipeline must be
failed.
Change-Id: Ib9be729088bd94338ef2353333eaea34ba3da99f
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
2 files changed, 21 insertions(+), 2 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/1968/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 53fa137..ca15b353 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -30,6 +30,7 @@
protected final int numOfFields;
protected final ArrayTupleBuilder tb;
protected final FeedLogManager feedLogManager;
+ protected boolean flushing;
public AbstractFeedDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfFields) {
@@ -54,8 +55,14 @@
@Override
public void flush() throws HyracksDataException {
+ flushing = true;
tupleForwarder.flush();
+ flushing = false;
}
public abstract String getStats();
+
+ public void fail() throws HyracksDataException {
+ tupleForwarder.fail();
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 4ed1b08..c85e236 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -131,12 +131,18 @@
return state;
}
- private IRawRecord<? extends T> next() throws HyracksDataException {
+ private IRawRecord<? extends T> next() throws Exception {
try {
return recordReader.next();
} catch (InterruptedException e) { // NOSONAR Gracefully handling
interrupt to push records in the pipeline
+ if (flushing) {
+ throw e;
+ }
throw new
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
} catch (Exception e) {
+ if (flushing) {
+ throw e;
+ }
if (!recordReader.handleException(e)) {
throw new
RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
}
@@ -144,13 +150,19 @@
}
}
- private boolean hasNext() throws HyracksDataException {
+ private boolean hasNext() throws Exception {
while (true) {
try {
return recordReader.hasNext();
} catch (InterruptedException e) { // NOSONAR Gracefully handling
interrupt to push records in the pipeline
+ if (flushing) {
+ throw e;
+ }
throw new
RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
} catch (Exception e) {
+ if (flushing) {
+ throw e;
+ }
if (!recordReader.handleException(e)) {
throw new
RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1968
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib9be729088bd94338ef2353333eaea34ba3da99f
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>