abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1051

Change subject: Fix Synchronization Issue in FeedRecordDataflowController
......................................................................

Fix Synchronization Issue in FeedRecordDataflowController

Change-Id: Ia290499132c320ce99402e698c76ba6d944f8f3d
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
1 file changed, 9 insertions(+), 4 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/51/1051/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index be9056b..e1dc4e0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -53,6 +53,8 @@
     protected final Object mutex = new Object();
     protected final boolean sendMarker;
     protected boolean failed = false;
+    private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
+    private Future<?> result;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
             @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, 
@Nonnull IRecordDataParser<T> dataParser,
@@ -68,9 +70,8 @@
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         ExecutorService executorService = sendMarker ? 
Executors.newSingleThreadExecutor() : null;
-        Future<?> result = null;
-        if (sendMarker) {
-            DataflowMarker dataflowMarker = new 
DataflowMarker(recordReader.getProgressReporter(),
+        if (sendMarker && dataflowMarker == null) {
+            dataflowMarker = new 
DataflowMarker(recordReader.getProgressReporter(),
                     TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, 
ctx));
             result = executorService.submit(dataflowMarker);
         }
@@ -84,7 +85,7 @@
                     IRawRecord<? extends T> record = recordReader.next();
                     if (record == null) {
                         flush();
-                        wait(INTERVAL);
+                        mutex.wait(INTERVAL);
                         continue;
                     }
                     tb.reset();
@@ -102,6 +103,7 @@
             throw new HyracksDataException(e);
         }
         try {
+            dataflowMarker.stop();
             tupleForwarder.close();
         } catch (Throwable th) {
             hde = 
ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
@@ -162,6 +164,9 @@
 
     @Override
     public boolean stop() throws HyracksDataException {
+        if (dataflowMarker != null) {
+            dataflowMarker.stop();
+        }
         HyracksDataException hde = null;
         if (recordReader.stop()) {
             if (failed) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1051
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia290499132c320ce99402e698c76ba6d944f8f3d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to