Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1775
Change subject: Misc. Cleanup / InterruptedException Handling
......................................................................
Misc. Cleanup / InterruptedException Handling
Change-Id: I0059ec85f8376160bb40bad721f3a8e291ad8ac2
---
M hyracks-fullstack/hyracks/hyracks-api/pom.xml
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
3 files changed, 47 insertions(+), 39 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/75/1775/1
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 0beee6f..ddba2d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -98,5 +98,9 @@
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
index 2f3c72b..28e098f 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -65,7 +65,7 @@
* extract start activities
*/
List<IConnectorDescriptor> conns =
getActivityInputMap().get(entry.getKey());
- if (conns == null || conns.size() == 0) {
+ if (conns == null || conns.isEmpty()) {
startActivities.put(entry.getKey(), entry.getValue());
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index eeaee04..314bf8b 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.api.rewriter.runtime;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -29,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -64,7 +66,7 @@
this.partition = partition;
this.nPartitions = nPartitions;
- /**
+ /*
* initialize the writer-relationship for the internal DAG of operator
* node pushables
*/
@@ -77,43 +79,39 @@
@Override
public void initialize() throws HyracksDataException {
- runInParallel(op -> op.initialize());
+ runInParallel(IOperatorNodePushable::initialize);
}
@Override
public void deinitialize() throws HyracksDataException {
- runInParallel(op -> op.deinitialize());
+ runInParallel(IOperatorNodePushable::deinitialize);
}
private void init() throws HyracksDataException {
- Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables =
new HashMap<>();
Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>
childQueue = new LinkedList<>();
List<IConnectorDescriptor> outputConnectors;
- /**
+ /*
* Set up the source operators
*/
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable =
- entry.getValue().createPushRuntime(ctx,
recordDescProvider, partition, nPartitions);
- startOperatorNodePushables.put(entry.getKey(), opPushable);
+ IOperatorNodePushable opPushable =
entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+ nPartitions);
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
inputArity += opPushable.getInputArity();
- outputConnectors =
parent.getActivityOutputMap().get(entry.getKey());
- if (outputConnectors != null) {
- for (IConnectorDescriptor conn : outputConnectors) {
-
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
- }
+ outputConnectors =
MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
+ Collections.emptyList());
+ for (IConnectorDescriptor conn : outputConnectors) {
+
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
}
}
- /**
- * Using BFS (breadth-first search) to construct to runtime execution
- * DAG;
+ /*
+ * Using BFS (breadth-first search) to construct to runtime execution
DAG...
*/
- while (childQueue.size() > 0) {
- /**
+ while (!childQueue.isEmpty()) {
+ /*
* construct the source to destination information
*/
Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel =
childQueue.poll();
@@ -130,25 +128,23 @@
operatorNodePushables.put(destId, destOp);
}
- /**
+ /*
* construct the dataflow connection from a producer to a consumer
*/
sourceOp.setOutputFrameWriter(outputChannel,
destOp.getInputFrameWriter(inputChannel),
recordDescProvider.getInputRecordDescriptor(destId,
inputChannel));
- /**
+ /*
* traverse to the child of the current activity
*/
- outputConnectors = parent.getActivityOutputMap().get(destId);
+ outputConnectors =
MapUtils.getObject(parent.getActivityOutputMap(), destId,
Collections.emptyList());
- /**
+ /*
* expend the executing activities further to the downstream
*/
- if (outputConnectors != null && outputConnectors.size() > 0) {
- for (IConnectorDescriptor conn : outputConnectors) {
- if (conn != null) {
-
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
- }
+ for (IConnectorDescriptor conn : outputConnectors) {
+ if (conn != null) {
+
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
}
}
}
@@ -162,7 +158,7 @@
@Override
public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter
writer, RecordDescriptor recordDesc)
throws HyracksDataException {
- /**
+ /*
* set the right output frame writer
*/
Pair<ActivityId, Integer> activityIdOutputIndex =
parent.getActivityIdOutputIndex(clusterOutputIndex);
@@ -172,7 +168,7 @@
@Override
public IFrameWriter getInputFrameWriter(final int index) {
- /**
+ /*
* get the right IFrameWriter from the cluster input index
*/
Pair<ActivityId, Integer> activityIdInputIndex =
parent.getActivityIdInputIndex(index);
@@ -209,16 +205,24 @@
for (Future<Void> task : tasks) {
task.get();
}
- } catch (Exception e) {
- try {
- startSemaphore.acquireUninterruptibly();
- for (Future<Void> task : tasks) {
- task.cancel(true);
- }
- } finally {
- completeSemaphore.acquireUninterruptibly();
- }
+ } catch (InterruptedException e) {
+ cancelTasks(tasks, startSemaphore, completeSemaphore);
+ Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
+ } catch (Exception e) {
+ cancelTasks(tasks, startSemaphore, completeSemaphore);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void cancelTasks(List<Future<Void>> tasks, Semaphore
startSemaphore, Semaphore completeSemaphore) {
+ try {
+ startSemaphore.acquireUninterruptibly();
+ for (Future<Void> task : tasks) {
+ task.cancel(true);
+ }
+ } finally {
+ completeSemaphore.acquireUninterruptibly();
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1775
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0059ec85f8376160bb40bad721f3a8e291ad8ac2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>