abdullah alamoudi has submitted this change and it was merged.

Change subject: ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable
......................................................................


ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable

Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1681
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
BAD: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Yingyi Bu <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
2 files changed, 49 insertions(+), 48 deletions(-)

Approvals:
  Yingyi Bu: Looks good to me, approved
  Jenkins: Verified; No violations found; No violations found; Verified



diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 4c0eb1b..c9cdb2d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -20,6 +20,8 @@
 package org.apache.hyracks.api.exceptions;
 
 import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -29,10 +31,16 @@
 public class HyracksDataException extends HyracksException {
 
     private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
Logger.getLogger(HyracksDataException.class.getName());
 
     public static HyracksDataException create(Throwable cause) {
         if (cause instanceof HyracksDataException || cause == null) {
             return (HyracksDataException) cause;
+        }
+        if (cause instanceof InterruptedException && 
!Thread.currentThread().isInterrupted()) {
+            LOGGER.log(Level.WARNING,
+                    "Wrapping an InterruptedException in HyracksDataException 
and current thread is not interrupted",
+                    cause);
         }
         return new HyracksDataException(cause);
     }
@@ -46,8 +54,8 @@
     }
 
     public static HyracksDataException create(HyracksDataException e, String 
nodeId) {
-        return new HyracksDataException(e.getComponent(), e.getErrorCode(), 
e.getMessage(), e.getCause(), nodeId, e
-                .getParams());
+        return new HyracksDataException(e.getComponent(), e.getErrorCode(), 
e.getMessage(), e.getCause(), nodeId,
+                e.getParams());
     }
 
     public static HyracksDataException suppress(HyracksDataException root, 
Throwable th) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 1c4f916..eeaee04 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,14 +20,14 @@
 package org.apache.hyracks.api.rewriter.runtime;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,12 +43,10 @@
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of 
one-to-one
  * connected activities in a single thread.
- *
- * @author yingyib
  */
 public class SuperActivityOperatorNodePushable implements 
IOperatorNodePushable {
-    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables 
= new HashMap<ActivityId, IOperatorNodePushable>();
-    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = 
new ArrayList<IOperatorNodePushable>();
+    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables 
= new HashMap<>();
+    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = 
new ArrayList<>();
     private final Map<ActivityId, IActivity> startActivities;
     private final SuperActivity parent;
     private final IHyracksTaskContext ctx;
@@ -56,7 +54,6 @@
     private final int partition;
     private final int nPartitions;
     private int inputArity = 0;
-    private boolean[] startedInitialization;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, 
Map<ActivityId, IActivity> startActivities,
             IHyracksTaskContext ctx, IRecordDescriptorProvider 
recordDescProvider, int partition, int nPartitions) {
@@ -80,24 +77,25 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        // Initializes all OperatorNodePushables in parallel and then finally 
deinitializes them.
-        runInParallel((op, index) -> {
-            startedInitialization[index] = true;
-            op.initialize();
-        });
+        runInParallel(op -> op.initialize());
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        runInParallel(op -> op.deinitialize());
     }
 
     private void init() throws HyracksDataException {
-        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = 
new HashMap<ActivityId, IOperatorNodePushable>();
-        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> 
childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, 
Integer>>>();
-        List<IConnectorDescriptor> outputConnectors = null;
+        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = 
new HashMap<>();
+        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> 
childQueue = new LinkedList<>();
+        List<IConnectorDescriptor> outputConnectors;
 
         /**
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = 
entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
-                    nPartitions);
+            IOperatorNodePushable opPushable =
+                    entry.getValue().createPushRuntime(ctx, 
recordDescProvider, partition, nPartitions);
             startOperatorNodePushables.put(entry.getKey(), opPushable);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
@@ -154,19 +152,6 @@
                 }
             }
         }
-
-        // Sets the startedInitialization flags to be false.
-        startedInitialization = new 
boolean[operatorNodePushablesBFSOrder.size()];
-        Arrays.fill(startedInitialization, false);
-    }
-
-    @Override
-    public void deinitialize() throws HyracksDataException {
-        runInParallel((op, index) -> {
-            if (startedInitialization[index]) {
-                op.deinitialize();
-            }
-        });
     }
 
     @Override
@@ -192,8 +177,7 @@
          */
         Pair<ActivityId, Integer> activityIdInputIndex = 
parent.getActivityIdInputIndex(index);
         IOperatorNodePushable operatorNodePushable = 
operatorNodePushables.get(activityIdInputIndex.getLeft());
-        IFrameWriter writer = 
operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
-        return writer;
+        return 
operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
     }
 
     @Override
@@ -201,31 +185,40 @@
         return "Super Activity " + parent.getActivityMap().values().toString();
     }
 
+    @FunctionalInterface
     interface OperatorNodePushableAction {
-        void runAction(IOperatorNodePushable op, int opIndex) throws 
HyracksDataException;
+        void run(IOperatorNodePushable op) throws HyracksDataException;
     }
 
-    private void runInParallel(OperatorNodePushableAction opAction) throws 
HyracksDataException {
-        List<Future<Void>> initializationTasks = new ArrayList<>();
+    private void runInParallel(OperatorNodePushableAction action) throws 
HyracksDataException {
+        List<Future<Void>> tasks = new ArrayList<>();
+        final Semaphore startSemaphore = new Semaphore(1 - 
operatorNodePushablesBFSOrder.size());
+        final Semaphore completeSemaphore = new Semaphore(1 - 
operatorNodePushablesBFSOrder.size());
         try {
-            int index = 0;
-            // Run one action for all OperatorNodePushables in parallel 
through a thread pool.
             for (final IOperatorNodePushable op : 
operatorNodePushablesBFSOrder) {
-                final int opIndex = index++;
-                initializationTasks.add(ctx.getExecutorService().submit(() -> {
-                    opAction.runAction(op, opIndex);
+                tasks.add(ctx.getExecutorService().submit(() -> {
+                    startSemaphore.release();
+                    try {
+                        action.run(op);
+                    } finally {
+                        completeSemaphore.release();
+                    }
                     return null;
                 }));
             }
-            // Waits until all parallel actions to finish.
-            for (Future<Void> initializationTask : initializationTasks) {
-                initializationTask.get();
+            for (Future<Void> task : tasks) {
+                task.get();
             }
         } catch (Exception e) {
-            for (Future<Void> initializationTask : initializationTasks) {
-                initializationTask.cancel(true);
+            try {
+                startSemaphore.acquireUninterruptibly();
+                for (Future<Void> task : tasks) {
+                    task.cancel(true);
+                }
+            } finally {
+                completeSemaphore.acquireUninterruptibly();
             }
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1681
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to