>From Michael Blow <[email protected]>:

Michael Blow has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19470 )


Change subject: [NO ISSUE][HYR][RT] Log pre-cancel thread stack on stuck 
canceled tasks, some interrupt fixes
......................................................................

[NO ISSUE][HYR][RT] Log pre-cancel thread stack on stuck canceled tasks, some 
interrupt fixes

Ext-ref: MB-65432
Change-Id: I926d16500f404397875c2401a458015f1636e21a
---
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
3 files changed, 39 insertions(+), 6 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/70/19470/1

diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index c7d2d94..c394220 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;

 public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
     protected IFrameWriter writer;
@@ -53,7 +54,7 @@
         try {
             fail();
         } catch (Throwable th) {
-            failure.addSuppressed(th);
+            ExceptionUtils.suppress(failure, th);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 338c799..cf6be6e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -37,6 +37,7 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;

 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -289,6 +290,12 @@
     }

     private boolean cancelTasks(List<Future<Void>> tasks, Set<Thread> 
runningThreads, Semaphore completeSemaphore) {
+        Map<Thread, Throwable> preCancelStackTraces = new HashMap<>();
+        for (Thread runningThread : runningThreads) {
+            Throwable threadStack = new Throwable();
+            threadStack.setStackTrace(runningThread.getStackTrace());
+            preCancelStackTraces.put(runningThread, threadStack);
+        }
         for (Future<Void> task : tasks) {
             task.cancel(true);
         }
@@ -298,8 +305,15 @@
             if (completionPoll.tryAcquireUninterruptibly(completeSemaphore)) {
                 return true;
             }
-            LOGGER.warn("Tasks of job {} did not complete within {}; 
interrupting running threads {}",
-                    ctx.getJobletContext().getJobId(), completionPoll, 
runningThreads);
+            
preCancelStackTraces.keySet().removeIf(Predicate.not(runningThreads::contains));
+            LOGGER.warn(
+                    "Tasks of job {} did not complete within {}; interrupting 
running threads (incl. stack trace just before last interrupt) {}",
+                    ctx.getJobletContext().getJobId(), completionPoll, 
preCancelStackTraces);
+            for (Thread runningThread : runningThreads) {
+                Throwable threadStack = new Throwable();
+                threadStack.setStackTrace(runningThread.getStackTrace());
+                preCancelStackTraces.put(runningThread, threadStack);
+            }
             interruptRunningThreads(runningThreads);
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index be8be9c..226f70a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -85,7 +85,9 @@
                 } catch (Throwable e) {
                     // First, clear the interrupted flag
                     interrupted |= Thread.interrupted();
-                    if (!ExceptionUtils.causedByInterrupt(e)) {
+                    if (ExceptionUtils.causedByInterrupt(e)) {
+                        interrupted = true;
+                    } else {
                         // The cause isn't an interruption, rethrow
                         throw e;
                     }
@@ -130,7 +132,9 @@
                 } catch (Throwable e) {
                     // First, clear the interrupted flag
                     interrupted |= Thread.interrupted();
-                    if (!ExceptionUtils.causedByInterrupt(e)) {
+                    if (ExceptionUtils.causedByInterrupt(e)) {
+                        interrupted = true;
+                    } else {
                         // The cause isn't an interruption, rethrow
                         throw e;
                     }
@@ -179,7 +183,11 @@
                 if (retryPolicy == null) {
                     retryPolicy = new 
ExponentialRetryPolicy(NUMBER_OF_RETRIES, MAX_DELAY_BETWEEN_RETRIES);
                 }
-                if (!retryPolicy.retry(e)) {
+                if (ExceptionUtils.causedByInterrupt(e)) {
+                    LOGGER.warn("Lost suppressed interrupt during 
ICloudReturnableRequest", e);
+                    Thread.currentThread().interrupt();
+                }
+                if (Thread.currentThread().isInterrupted() || 
!retryPolicy.retry(e)) {
                     throw HyracksDataException.create(e);
                 }
                 attempt++;

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19470
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I926d16500f404397875c2401a458015f1636e21a
Gerrit-Change-Number: 19470
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-MessageType: newchange

Reply via email to