abdullah alamoudi has submitted this change and it was merged.

Change subject: Fix Error in Aborting Task in Super Activity
......................................................................


Fix Error in Aborting Task in Super Activity

When aborting a task, its thread gets interrupted. This creates
a problem when interrupting

Change-Id: I603d3c101e0a4de4816eb5a6a7fd4320df317ce4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/582
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
2 files changed, 20 insertions(+), 18 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Verified



diff --git 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4e842bb..827e478 100644
--- 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -196,24 +196,26 @@
 
     private void runInParallel(OperatorNodePushableAction opAction) throws 
HyracksDataException {
         List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
-        // Run one action for all OperatorNodePushables in parallel through a 
thread pool.
-        for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
-            initializationTasks.add(ctx.getExecutorService().submit(new 
Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    opAction.runAction(op);
-                    return null;
-                }
-            }));
-        }
-
-        // Waits until all parallel actions to finish.
-        for (Future<Void> initializationTask : initializationTasks) {
-            try {
-                initializationTask.get();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
+        try {
+            // Run one action for all OperatorNodePushables in parallel 
through a thread pool.
+            for (final IOperatorNodePushable op : 
operatorNodePushablesBFSOrder) {
+                initializationTasks.add(ctx.getExecutorService().submit(new 
Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        opAction.runAction(op);
+                        return null;
+                    }
+                }));
             }
+            // Waits until all parallel actions to finish.
+            for (Future<Void> initializationTask : initializationTasks) {
+                initializationTask.get();
+            }
+        } catch (Throwable th) {
+            for (Future<Void> initializationTask : initializationTasks) {
+                initializationTask.cancel(true);
+            }
+            throw new HyracksDataException(th);
         }
     }
 }
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 12df264..61baf82 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -255,8 +255,8 @@
         addPendingThread(ct);
         try {
             ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
-            operator.initialize();
             try {
+                operator.initialize();
                 if (collectors.length > 0) {
                     final Semaphore sem = new Semaphore(collectors.length - 1);
                     for (int i = 1; i < collectors.length; ++i) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/582
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I603d3c101e0a4de4816eb5a6a7fd4320df317ce4
Gerrit-PatchSet: 3
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to