Michael Blow has submitted this change and it was merged. Change subject: Misc. Cleanup / InterruptedException Handling ......................................................................
Misc. Cleanup / InterruptedException Handling Change-Id: I0059ec85f8376160bb40bad721f3a8e291ad8ac2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1775 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/pom.xml M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java 3 files changed, 47 insertions(+), 39 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml index 0beee6f..ddba2d8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml @@ -98,5 +98,9 @@ <groupId>args4j</groupId> <artifactId>args4j</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java index 2f3c72b..28e098f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java @@ -65,7 +65,7 @@ * extract start activities */ List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey()); - if (conns == null || conns.size() == 0) { + if (conns == null || conns.isEmpty()) { startActivities.put(entry.getKey(), entry.getValue()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index eeaee04..314bf8b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -20,6 +20,7 @@ package org.apache.hyracks.api.rewriter.runtime; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -29,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -64,7 +66,7 @@ this.partition = partition; this.nPartitions = nPartitions; - /** + /* * initialize the writer-relationship for the internal DAG of operator * node pushables */ @@ -77,43 +79,39 @@ @Override public void initialize() throws HyracksDataException { - runInParallel(op -> op.initialize()); + runInParallel(IOperatorNodePushable::initialize); } @Override public void deinitialize() throws HyracksDataException { - runInParallel(op -> op.deinitialize()); + runInParallel(IOperatorNodePushable::deinitialize); } private void init() throws HyracksDataException { - Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<>(); Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>(); List<IConnectorDescriptor> outputConnectors; - /** + /* * Set up the source operators */ for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) { - IOperatorNodePushable opPushable = - entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions); - startOperatorNodePushables.put(entry.getKey(), opPushable); + IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, + nPartitions); operatorNodePushablesBFSOrder.add(opPushable); operatorNodePushables.put(entry.getKey(), opPushable); inputArity += opPushable.getInputArity(); - outputConnectors = parent.getActivityOutputMap().get(entry.getKey()); - if (outputConnectors != null) { - for (IConnectorDescriptor conn : outputConnectors) { - childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); - } + outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), + Collections.emptyList()); + for (IConnectorDescriptor conn : outputConnectors) { + childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); } } - /** - * Using BFS (breadth-first search) to construct to runtime execution - * DAG; + /* + * Using BFS (breadth-first search) to construct to runtime execution DAG... */ - while (childQueue.size() > 0) { - /** + while (!childQueue.isEmpty()) { + /* * construct the source to destination information */ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll(); @@ -130,25 +128,23 @@ operatorNodePushables.put(destId, destOp); } - /** + /* * construct the dataflow connection from a producer to a consumer */ sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel), recordDescProvider.getInputRecordDescriptor(destId, inputChannel)); - /** + /* * traverse to the child of the current activity */ - outputConnectors = parent.getActivityOutputMap().get(destId); + outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), destId, Collections.emptyList()); - /** + /* * expend the executing activities further to the downstream */ - if (outputConnectors != null && outputConnectors.size() > 0) { - for (IConnectorDescriptor conn : outputConnectors) { - if (conn != null) { - childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); - } + for (IConnectorDescriptor conn : outputConnectors) { + if (conn != null) { + childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId())); } } } @@ -162,7 +158,7 @@ @Override public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException { - /** + /* * set the right output frame writer */ Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex); @@ -172,7 +168,7 @@ @Override public IFrameWriter getInputFrameWriter(final int index) { - /** + /* * get the right IFrameWriter from the cluster input index */ Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index); @@ -209,16 +205,24 @@ for (Future<Void> task : tasks) { task.get(); } - } catch (Exception e) { - try { - startSemaphore.acquireUninterruptibly(); - for (Future<Void> task : tasks) { - task.cancel(true); - } - } finally { - completeSemaphore.acquireUninterruptibly(); - } + } catch (InterruptedException e) { + cancelTasks(tasks, startSemaphore, completeSemaphore); + Thread.currentThread().interrupt(); throw HyracksDataException.create(e); + } catch (Exception e) { + cancelTasks(tasks, startSemaphore, completeSemaphore); + throw HyracksDataException.create(e); + } + } + + private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) { + try { + startSemaphore.acquireUninterruptibly(); + for (Future<Void> task : tasks) { + task.cancel(true); + } + } finally { + completeSemaphore.acquireUninterruptibly(); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1775 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0059ec85f8376160bb40bad721f3a8e291ad8ac2 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
