abdullah alamoudi has submitted this change and it was merged. Change subject: ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable ......................................................................
ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1681 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java 2 files changed, 49 insertions(+), 48 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index 4c0eb1b..c9cdb2d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -20,6 +20,8 @@ package org.apache.hyracks.api.exceptions; import java.io.Serializable; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.util.ErrorMessageUtil; @@ -29,10 +31,16 @@ public class HyracksDataException extends HyracksException { private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(HyracksDataException.class.getName()); public static HyracksDataException create(Throwable cause) { if (cause instanceof HyracksDataException || cause == null) { return (HyracksDataException) cause; + } + if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) { + LOGGER.log(Level.WARNING, + "Wrapping an InterruptedException in HyracksDataException and current thread is not interrupted", + cause); } return new HyracksDataException(cause); } @@ -46,8 +54,8 @@ } public static HyracksDataException create(HyracksDataException e, String nodeId) { - return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e - .getParams()); + return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, + e.getParams()); } public static HyracksDataException suppress(HyracksDataException root, Throwable th) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 1c4f916..eeaee04 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -20,14 +20,14 @@ package org.apache.hyracks.api.rewriter.runtime; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Map.Entry; +import java.util.Queue; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.comm.IFrameWriter; @@ -43,12 +43,10 @@ /** * The runtime of a SuperActivity, which internally executes a DAG of one-to-one * connected activities in a single thread. - * - * @author yingyib */ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable { - private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>(); - private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>(); + private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>(); + private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>(); private final Map<ActivityId, IActivity> startActivities; private final SuperActivity parent; private final IHyracksTaskContext ctx; @@ -56,7 +54,6 @@ private final int partition; private final int nPartitions; private int inputArity = 0; - private boolean[] startedInitialization; public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities, IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { @@ -80,24 +77,25 @@ @Override public void initialize() throws HyracksDataException { - // Initializes all OperatorNodePushables in parallel and then finally deinitializes them. - runInParallel((op, index) -> { - startedInitialization[index] = true; - op.initialize(); - }); + runInParallel(op -> op.initialize()); + } + + @Override + public void deinitialize() throws HyracksDataException { + runInParallel(op -> op.deinitialize()); } private void init() throws HyracksDataException { - Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>(); - Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>(); - List<IConnectorDescriptor> outputConnectors = null; + Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>(); + Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>(); + List<IConnectorDescriptor> outputConnectors; /** * Set up the source operators */ for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) { - IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, - nPartitions); + IOperatorNodePushable opPushable = + entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); startOperatorNodePushables.put(entry.getKey(), opPushable); operatorNodePushablesBFSOrder.add(opPushable); operatorNodePushables.put(entry.getKey(), opPushable); @@ -154,19 +152,6 @@ } } } - - // Sets the startedInitialization flags to be false. - startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()]; - Arrays.fill(startedInitialization, false); - } - - @Override - public void deinitialize() throws HyracksDataException { - runInParallel((op, index) -> { - if (startedInitialization[index]) { - op.deinitialize(); - } - }); } @Override @@ -192,8 +177,7 @@ */ Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index); IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft()); - IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight()); - return writer; + return operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight()); } @Override @@ -201,31 +185,40 @@ return "Super Activity " + parent.getActivityMap().values().toString(); } + @FunctionalInterface interface OperatorNodePushableAction { - void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException; + void run(IOperatorNodePushable op) throws HyracksDataException; } - private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException { - List<Future<Void>> initializationTasks = new ArrayList<>(); + private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException { + List<Future<Void>> tasks = new ArrayList<>(); + final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size()); + final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size()); try { - int index = 0; - // Run one action for all OperatorNodePushables in parallel through a thread pool. for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) { - final int opIndex = index++; - initializationTasks.add(ctx.getExecutorService().submit(() -> { - opAction.runAction(op, opIndex); + tasks.add(ctx.getExecutorService().submit(() -> { + startSemaphore.release(); + try { + action.run(op); + } finally { + completeSemaphore.release(); + } return null; })); } - // Waits until all parallel actions to finish. - for (Future<Void> initializationTask : initializationTasks) { - initializationTask.get(); + for (Future<Void> task : tasks) { + task.get(); } } catch (Exception e) { - for (Future<Void> initializationTask : initializationTasks) { - initializationTask.cancel(true); + try { + startSemaphore.acquireUninterruptibly(); + for (Future<Void> task : tasks) { + task.cancel(true); + } + } finally { + completeSemaphore.acquireUninterruptibly(); } - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1681 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
