abdullah alamoudi has submitted this change and it was merged. Change subject: Fix Error in Aborting Task in Super Activity ......................................................................
Fix Error in Aborting Task in Super Activity When aborting a task, its thread gets interrupted. This creates a problem when interrupting Change-Id: I603d3c101e0a4de4816eb5a6a7fd4320df317ce4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/582 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java 2 files changed, 20 insertions(+), 18 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 4e842bb..827e478 100644 --- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -196,24 +196,26 @@ private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException { List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>(); - // Run one action for all OperatorNodePushables in parallel through a thread pool. - for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) { - initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - opAction.runAction(op); - return null; - } - })); - } - - // Waits until all parallel actions to finish. - for (Future<Void> initializationTask : initializationTasks) { - try { - initializationTask.get(); - } catch (Exception e) { - throw new HyracksDataException(e); + try { + // Run one action for all OperatorNodePushables in parallel through a thread pool. + for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) { + initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + opAction.runAction(op); + return null; + } + })); } + // Waits until all parallel actions to finish. + for (Future<Void> initializationTask : initializationTasks) { + initializationTask.get(); + } + } catch (Throwable th) { + for (Future<Void> initializationTask : initializationTasks) { + initializationTask.cancel(true); + } + throw new HyracksDataException(th); } } } diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 12df264..61baf82 100644 --- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -255,8 +255,8 @@ addPendingThread(ct); try { ct.setName(displayName + ":" + taskAttemptId + ":" + 0); - operator.initialize(); try { + operator.initialize(); if (collectors.length > 0) { final Semaphore sem = new Semaphore(collectors.length - 1); for (int i = 1; i < collectors.length; ++i) { -- To view, visit https://asterix-gerrit.ics.uci.edu/582 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I603d3c101e0a4de4816eb5a6a7fd4320df317ce4 Gerrit-PatchSet: 3 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
