Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1052
Change subject: small refactoring
......................................................................
small refactoring
Change-Id: I37eab1645416e3aad6119bba527c5e3b4b98fddc
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
1 file changed, 21 insertions(+), 15 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/52/1052/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 4189dbf..10e9125 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -54,7 +54,7 @@
protected final boolean sendMarker;
protected boolean failed = false;
private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
- private Future<?> result;
+ private Future<?> dataflowMarkerResult;
public FeedRecordDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
@Nonnull FeedLogManager feedLogManager, int numOfOutputFields,
@Nonnull IRecordDataParser<T> dataParser,
@@ -69,12 +69,7 @@
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
- ExecutorService executorService = sendMarker ?
Executors.newSingleThreadExecutor() : null;
- if (sendMarker && dataflowMarker == null) {
- dataflowMarker = new
DataflowMarker(recordReader.getProgressReporter(),
- TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE,
ctx));
- result = executorService.submit(dataflowMarker);
- }
+ startDataflowMarker();
HyracksDataException hde = null;
try {
failed = false;
@@ -102,9 +97,7 @@
LOGGER.warn("Failure while operating a feed source", e);
throw new HyracksDataException(e);
}
- if(dataflowMarker != null){
- dataflowMarker.stop();
- }
+ stopDataflowMarker();
try {
tupleForwarder.close();
} catch (Throwable th) {
@@ -117,8 +110,8 @@
hde =
ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
} finally {
closeSignal();
- if (sendMarker && result != null) {
- result.cancel(true);
+ if (sendMarker && dataflowMarkerResult != null) {
+ dataflowMarkerResult.cancel(true);
}
}
if (hde != null) {
@@ -149,6 +142,21 @@
protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends
T> record) throws IOException {
}
+ private void startDataflowMarker() {
+ ExecutorService executorService = sendMarker ?
Executors.newSingleThreadExecutor() : null;
+ if (sendMarker && dataflowMarker == null) {
+ dataflowMarker = new
DataflowMarker(recordReader.getProgressReporter(),
+ TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE,
ctx));
+ dataflowMarkerResult = executorService.submit(dataflowMarker);
+ }
+ }
+
+ private void stopDataflowMarker() {
+ if (dataflowMarker != null) {
+ dataflowMarker.stop();
+ }
+ }
+
private void closeSignal() {
synchronized (closed) {
closed.set(true);
@@ -166,9 +174,7 @@
@Override
public boolean stop() throws HyracksDataException {
- if (dataflowMarker != null) {
- dataflowMarker.stop();
- }
+ stopDataflowMarker();
HyracksDataException hde = null;
if (recordReader.stop()) {
if (failed) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1052
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I37eab1645416e3aad6119bba527c5e3b4b98fddc
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>