abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1051
Change subject: Fix Synchronization Issue in FeedRecordDataflowController
......................................................................
Fix Synchronization Issue in FeedRecordDataflowController
Change-Id: Ia290499132c320ce99402e698c76ba6d944f8f3d
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
1 file changed, 9 insertions(+), 4 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/51/1051/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index be9056b..e1dc4e0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -53,6 +53,8 @@
protected final Object mutex = new Object();
protected final boolean sendMarker;
protected boolean failed = false;
+ private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
+ private Future<?> result;
public FeedRecordDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
@Nonnull FeedLogManager feedLogManager, int numOfOutputFields,
@Nonnull IRecordDataParser<T> dataParser,
@@ -68,9 +70,8 @@
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
ExecutorService executorService = sendMarker ?
Executors.newSingleThreadExecutor() : null;
- Future<?> result = null;
- if (sendMarker) {
- DataflowMarker dataflowMarker = new
DataflowMarker(recordReader.getProgressReporter(),
+ if (sendMarker && dataflowMarker == null) {
+ dataflowMarker = new
DataflowMarker(recordReader.getProgressReporter(),
TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE,
ctx));
result = executorService.submit(dataflowMarker);
}
@@ -84,7 +85,7 @@
IRawRecord<? extends T> record = recordReader.next();
if (record == null) {
flush();
- wait(INTERVAL);
+ mutex.wait(INTERVAL);
continue;
}
tb.reset();
@@ -102,6 +103,7 @@
throw new HyracksDataException(e);
}
try {
+ dataflowMarker.stop();
tupleForwarder.close();
} catch (Throwable th) {
hde =
ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
@@ -162,6 +164,9 @@
@Override
public boolean stop() throws HyracksDataException {
+ if (dataflowMarker != null) {
+ dataflowMarker.stop();
+ }
HyracksDataException hde = null;
if (recordReader.stop()) {
if (failed) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1051
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia290499132c320ce99402e698c76ba6d944f8f3d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>