abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1964

Change subject: [ASTERIXDB-2064][ING] Timeout Stop Feed
......................................................................

[ASTERIXDB-2064][ING] Timeout Stop Feed

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- The abort feed message stops the reader and wait for the
  dataflow controller to signal end of life.
- If the reader returns true to stop but the dataflow controller
  never signal ends, it can get stuck.
- This change adds a timeout after which, the task thread is
  interrupted.

Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
8 files changed, 37 insertions(+), 12 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/64/1964/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index 7412338..f59b82e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -40,7 +40,7 @@
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
-    public default boolean stop() throws HyracksDataException {
+    public default boolean stop(long timeout) throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index e24c26d..921739f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -213,16 +213,28 @@
         }
     }
 
-    private void waitForSignal() throws InterruptedException {
+    private void waitForSignal(long timeout) throws InterruptedException, 
HyracksDataException {
+        long start = System.currentTimeMillis();
+        long now = start;
         synchronized (closed) {
             while (!closed.get()) {
-                closed.wait();
+                long sofar = now - start;
+                long remaining = timeout - sofar;
+                if (remaining > 0) {
+                    closed.wait(remaining);
+                    now = System.currentTimeMillis();
+                    sofar = now - start;
+                    remaining = timeout - sofar;
+                }
+                if (!closed.get() && remaining <= 0) {
+                    throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.TIMEOUT);
+                }
             }
         }
     }
 
     @Override
-    public boolean stop() throws HyracksDataException {
+    public boolean stop(long timeout) throws HyracksDataException {
         synchronized (this) {
             switch (state) {
                 case CREATED:
@@ -238,7 +250,7 @@
         }
         if (recordReader.stop()) {
             try {
-                waitForSignal();
+                waitForSignal(timeout);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 1f1f545..025520e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public boolean stop() throws HyracksDataException {
+    public boolean stop(long timeout) throws HyracksDataException {
         try {
             if (stream.stop()) {
                 return true;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index eeda80c..fd9db7e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -35,8 +35,8 @@
         controller.start(writer);
     }
 
-    public boolean stop() throws HyracksDataException {
-        return controller.stop();
+    public boolean stop(long timeout) throws HyracksDataException {
+        return controller.stop(timeout);
     }
 
     public boolean pause() throws HyracksDataException {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 16b8fba..ff48f51 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
@@ -121,9 +122,19 @@
         LOGGER.info(runtimeId + " aborting...");
         synchronized (this) {
             poisoned = true;
-            if (!adapter.stop()) {
-                LOGGER.info(runtimeId + " failed to stop adapter. interrupting 
the thread...");
-                taskThread.interrupt();
+            try {
+                if (!adapter.stop(10000)) {
+                    LOGGER.info(runtimeId + " failed to stop adapter. 
interrupting the thread...");
+                    taskThread.interrupt();
+                }
+            } catch (HyracksDataException hde) {
+                if (hde.getComponent() == ErrorCode.HYRACKS && 
hde.getErrorCode() == ErrorCode.TIMEOUT) {
+                    LOGGER.log(Level.WARNING, runtimeId + " stop adapter timed 
out. interrupting the thread...", hde);
+                    taskThread.interrupt();
+                } else {
+                    LOGGER.log(Level.WARNING, "Failure during attempt to stop 
" + runtimeId, hde);
+                    throw hde;
+                }
             }
         }
     }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index 2273bea..fcd010d 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -137,7 +137,7 @@
     }
 
     @Override
-    public boolean stop() {
+    public boolean stop(long timeout) {
         generator.stop();
         return true;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b054faf..ff98efa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -124,6 +124,7 @@
     public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
     public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
     public static final int ILLEGAL_MEMORY_BUDGET = 90;
+    public static final int TIMEOUT = 91;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 1d2143b..6d4ccdb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -107,5 +107,6 @@
 88 = Cannot modify index (Disk is full)
 89 = The byte size of a single group (%1$s bytes) exceeds the budget for a 
group by operator (%2$s bytes)
 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the 
minimum (%3$s bytes)
+91 = Operation timed out
 
 10000 = The given rule collection %1$s is not an instance of the List class.

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1964
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If609a8343767ee7a80689a58af35a1b3fca2964b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to