abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1964
Change subject: [ASTERIXDB-2064][ING] Timeout Stop Feed
......................................................................
[ASTERIXDB-2064][ING] Timeout Stop Feed
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- The abort feed message stops the reader and wait for the
dataflow controller to signal end of life.
- If the reader returns true to stop but the dataflow controller
never signal ends, it can get stuck.
- This change adds a timeout after which, the task thread is
interrupted.
Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
8 files changed, 37 insertions(+), 12 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/64/1964/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index 7412338..f59b82e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -40,7 +40,7 @@
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
- public default boolean stop() throws HyracksDataException {
+ public default boolean stop(long timeout) throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index e24c26d..921739f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -213,16 +213,28 @@
}
}
- private void waitForSignal() throws InterruptedException {
+ private void waitForSignal(long timeout) throws InterruptedException,
HyracksDataException {
+ long start = System.currentTimeMillis();
+ long now = start;
synchronized (closed) {
while (!closed.get()) {
- closed.wait();
+ long sofar = now - start;
+ long remaining = timeout - sofar;
+ if (remaining > 0) {
+ closed.wait(remaining);
+ now = System.currentTimeMillis();
+ sofar = now - start;
+ remaining = timeout - sofar;
+ }
+ if (!closed.get() && remaining <= 0) {
+ throw
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.TIMEOUT);
+ }
}
}
}
@Override
- public boolean stop() throws HyracksDataException {
+ public boolean stop(long timeout) throws HyracksDataException {
synchronized (this) {
switch (state) {
case CREATED:
@@ -238,7 +250,7 @@
}
if (recordReader.stop()) {
try {
- waitForSignal();
+ waitForSignal(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 1f1f545..025520e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -71,7 +71,7 @@
}
@Override
- public boolean stop() throws HyracksDataException {
+ public boolean stop(long timeout) throws HyracksDataException {
try {
if (stream.stop()) {
return true;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index eeda80c..fd9db7e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -35,8 +35,8 @@
controller.start(writer);
}
- public boolean stop() throws HyracksDataException {
- return controller.stop();
+ public boolean stop(long timeout) throws HyracksDataException {
+ return controller.stop(timeout);
}
public boolean pause() throws HyracksDataException {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 16b8fba..ff48f51 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
@@ -121,9 +122,19 @@
LOGGER.info(runtimeId + " aborting...");
synchronized (this) {
poisoned = true;
- if (!adapter.stop()) {
- LOGGER.info(runtimeId + " failed to stop adapter. interrupting
the thread...");
- taskThread.interrupt();
+ try {
+ if (!adapter.stop(10000)) {
+ LOGGER.info(runtimeId + " failed to stop adapter.
interrupting the thread...");
+ taskThread.interrupt();
+ }
+ } catch (HyracksDataException hde) {
+ if (hde.getComponent() == ErrorCode.HYRACKS &&
hde.getErrorCode() == ErrorCode.TIMEOUT) {
+ LOGGER.log(Level.WARNING, runtimeId + " stop adapter timed
out. interrupting the thread...", hde);
+ taskThread.interrupt();
+ } else {
+ LOGGER.log(Level.WARNING, "Failure during attempt to stop
" + runtimeId, hde);
+ throw hde;
+ }
}
}
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index 2273bea..fcd010d 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -137,7 +137,7 @@
}
@Override
- public boolean stop() {
+ public boolean stop(long timeout) {
generator.stop();
return true;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b054faf..ff98efa 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -124,6 +124,7 @@
public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
public static final int ILLEGAL_MEMORY_BUDGET = 90;
+ public static final int TIMEOUT = 91;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1d2143b..6d4ccdb 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -107,5 +107,6 @@
88 = Cannot modify index (Disk is full)
89 = The byte size of a single group (%1$s bytes) exceeds the budget for a
group by operator (%2$s bytes)
90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the
minimum (%3$s bytes)
+91 = Operation timed out
10000 = The given rule collection %1$s is not an instance of the List class.
--
To view, visit https://asterix-gerrit.ics.uci.edu/1964
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>