abdullah alamoudi has submitted this change and it was merged. Change subject: [ASTERIXDB-2067][ING] Handle Failures in Controller Flush ......................................................................
[ASTERIXDB-2067][ING] Handle Failures in Controller Flush - user model changes: no - storage format changes: no - interface changes: no details: - Failures that happen in feed while reading from external sources allows ingestion pipeline to close gracefully pushing parsed records in the frame forward before failing. - There was an assumption that when hasNext() or next() are being called on a data reader and we fail, then the failure didn't affect the integrity of the pipeline. - This assumption is incorrect as hasNext() and next() can themselves flush the pipeline and if the failure happened during the flush call, the pipeline must be failed. Change-Id: Ib9be729088bd94338ef2353333eaea34ba3da99f Reviewed-on: https://asterix-gerrit.ics.uci.edu/1968 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java 3 files changed, 20 insertions(+), 2 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; ; Verified Murtadha Hubail: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java index 7b7de93..73439ce 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -171,6 +171,9 @@ DatasetUtil.getFullyQualifiedName(dataset)); } synchronized (listener) { + if (cancelRecovery) { + return null; + } if (listener.getState() == ActivityState.TEMPORARILY_FAILED) { listener.setState(ActivityState.PERMANENTLY_FAILED); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index 53fa137..3437de1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -30,6 +30,7 @@ protected final int numOfFields; protected final ArrayTupleBuilder tb; protected final FeedLogManager feedLogManager; + protected boolean flushing; public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, FeedLogManager feedLogManager, int numOfFields) { @@ -54,7 +55,9 @@ @Override public void flush() throws HyracksDataException { + flushing = true; tupleForwarder.flush(); + flushing = false; } public abstract String getStats(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 4ed1b08..c85e236 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -131,12 +131,18 @@ return state; } - private IRawRecord<? extends T> next() throws HyracksDataException { + private IRawRecord<? extends T> next() throws Exception { try { return recordReader.next(); } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + if (flushing) { + throw e; + } throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); } catch (Exception e) { + if (flushing) { + throw e; + } if (!recordReader.handleException(e)) { throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e); } @@ -144,13 +150,19 @@ } } - private boolean hasNext() throws HyracksDataException { + private boolean hasNext() throws Exception { while (true) { try { return recordReader.hasNext(); } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline + if (flushing) { + throw e; + } throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e); } catch (Exception e) { + if (flushing) { + throw e; + } if (!recordReader.handleException(e)) { throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1968 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ib9be729088bd94338ef2353333eaea34ba3da99f Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
