>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465 )


Change subject: [NO ISSUE][HYR] Keep trying to cancel job tasks
......................................................................

[NO ISSUE][HYR] Keep trying to cancel job tasks

- user model changes: no
- storage format changes: no
- interface changes: no

Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
1 file changed, 31 insertions(+), 2 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/65/19465/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 17f5cb1..9bbd161 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;

 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -266,11 +267,26 @@
     private void cancelTasks(List<Future<Void>> tasks, Semaphore 
startSemaphore, Semaphore completeSemaphore) {
         try {
             startSemaphore.acquireUninterruptibly();
+            cancelTasks(tasks, completeSemaphore);
+        } catch (Throwable th) {
+            completeSemaphore.acquireUninterruptibly();
+            throw th;
+        }
+    }
+
+    private static void cancelTasks(List<Future<Void>> tasks, Semaphore 
completeSemaphore) {
+        while (true) {
             for (Future<Void> task : tasks) {
                 task.cancel(true);
             }
-        } finally {
-            completeSemaphore.acquireUninterruptibly();
+            try {
+                if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
+                    return;
+                }
+                // log warn not all tasks were cancelled within 5 minutes
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d
Gerrit-Change-Number: 19465
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange

Reply via email to