>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19469 )
Change subject: [ASTERIXDB-3570][RT] Re-interrupt running threads on tasks cancelation ...................................................................... [ASTERIXDB-3570][RT] Re-interrupt running threads on tasks cancelation - user model changes: no - storage format changes: no - interface changes: no Details: - Re-interrupt running threads on tasks cancelation when threads don't finish within some period of time. Ext-ref: MB-65432 Change-Id: I7d056402343e9f80610fef88fda8ca3cda729f04 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19469 Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Michael Blow <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java 1 file changed, 52 insertions(+), 11 deletions(-) Approvals: Michael Blow: Looks good to me, approved; Verified Anon. E. Moose #1000171: Jenkins: Verified diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 83d0eb8..338c799 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -65,6 +66,7 @@ private static final Logger LOGGER = LogManager.getLogger(); private static final String CLASS_ABBREVIATION = "SAO"; + private static final long TASKS_COMPLETION_POLL_SECONDS = TimeUnit.MINUTES.toSeconds(2); private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>(); private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>(); private final Map<ActivityId, IActivity> startActivities; @@ -226,6 +228,7 @@ private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException { List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size()); + Set<Thread> runningThreads = ConcurrentHashMap.newKeySet(); Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size()); final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size()); final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size()); @@ -233,6 +236,7 @@ try { for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) { tasks.add(ctx.getExecutorService().submit(() -> { + runningThreads.add(Thread.currentThread()); startSemaphore.release(); try { Thread.currentThread().setName(CLASS_ABBREVIATION + ":" + ctx.getJobletContext().getJobId() @@ -243,6 +247,7 @@ throw th; } finally { ctx.unsubscribeThreadFromStats(); + runningThreads.remove(Thread.currentThread()); completeSemaphore.release(); } return null; @@ -264,17 +269,18 @@ } if (root != null) { final Throwable failure = root; - cancelTasks(tasks, startSemaphore, completeSemaphore); + cancelTasks(tasks, runningThreads, startSemaphore, completeSemaphore); failures.forEach(t -> ExceptionUtils.suppress(failure, t)); throw HyracksDataException.create(failure); } } - private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) { + private void cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore startSemaphore, + Semaphore completeSemaphore) { boolean cancelCompleted = false; try { startSemaphore.acquireUninterruptibly(); - cancelCompleted = cancelTasks(tasks, completeSemaphore); + cancelCompleted = cancelTasks(tasks, runningThreads, completeSemaphore); } finally { if (!cancelCompleted) { completeSemaphore.acquireUninterruptibly(); @@ -282,17 +288,29 @@ } } - private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) { - Span retryWait = Span.init(5, TimeUnit.MINUTES); + private boolean cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore completeSemaphore) { + for (Future<Void> task : tasks) { + task.cancel(true); + } + Span completionPoll = Span.init(TASKS_COMPLETION_POLL_SECONDS, TimeUnit.SECONDS); while (true) { - for (Future<Void> task : tasks) { - task.cancel(true); - } - retryWait.reset(); - if (retryWait.tryAcquireUninterruptibly(completeSemaphore)) { + completionPoll.reset(); + if (completionPoll.tryAcquireUninterruptibly(completeSemaphore)) { return true; } - LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling..."); + LOGGER.warn("Tasks of job {} did not complete within {}; interrupting running threads {}", + ctx.getJobletContext().getJobId(), completionPoll, runningThreads); + interruptRunningThreads(runningThreads); + } + } + + private static void interruptRunningThreads(Set<Thread> threads) { + for (Thread thread : threads) { + try { + thread.interrupt(); + } catch (Throwable t) { + LOGGER.debug("failed to interrupt thread {}", thread, t); + } } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19469 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: I7d056402343e9f80610fef88fda8ca3cda729f04 Gerrit-Change-Number: 19469 Gerrit-PatchSet: 3 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-MessageType: merged
