Till Westmann has submitted this change and it was merged. Change subject: Fix Synchronization Issue in FeedRecordDataflowController ......................................................................
Fix Synchronization Issue in FeedRecordDataflowController Change-Id: Ia290499132c320ce99402e698c76ba6d944f8f3d Reviewed-on: https://asterix-gerrit.ics.uci.edu/1051 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java 1 file changed, 11 insertions(+), 4 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index be9056b..4189dbf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -53,6 +53,8 @@ protected final Object mutex = new Object(); protected final boolean sendMarker; protected boolean failed = false; + private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker; + private Future<?> result; public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser, @@ -68,9 +70,8 @@ @Override public void start(IFrameWriter writer) throws HyracksDataException { ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null; - Future<?> result = null; - if (sendMarker) { - DataflowMarker dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(), + if (sendMarker && dataflowMarker == null) { + dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(), TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx)); result = executorService.submit(dataflowMarker); } @@ -84,7 +85,7 @@ IRawRecord<? extends T> record = recordReader.next(); if (record == null) { flush(); - wait(INTERVAL); + mutex.wait(INTERVAL); continue; } tb.reset(); @@ -100,6 +101,9 @@ tupleForwarder.flush(); LOGGER.warn("Failure while operating a feed source", e); throw new HyracksDataException(e); + } + if(dataflowMarker != null){ + dataflowMarker.stop(); } try { tupleForwarder.close(); @@ -162,6 +166,9 @@ @Override public boolean stop() throws HyracksDataException { + if (dataflowMarker != null) { + dataflowMarker.stop(); + } HyracksDataException hde = null; if (recordReader.stop()) { if (failed) { -- To view, visit https://asterix-gerrit.ics.uci.edu/1051 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ia290499132c320ce99402e698c76ba6d944f8f3d Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
