abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1681
Change subject: ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable
......................................................................
ASTERIXDB-1838 Fix SuperActivityOperatorNodePushable
Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
1 file changed, 86 insertions(+), 24 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/81/1681/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 1c4f916..523b468 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -25,9 +25,10 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Map.Entry;
+import java.util.Queue;
import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,20 +44,22 @@
/**
* The runtime of a SuperActivity, which internally executes a DAG of
one-to-one
* connected activities in a single thread.
- *
- * @author yingyib
*/
public class SuperActivityOperatorNodePushable implements
IOperatorNodePushable {
- private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables
= new HashMap<ActivityId, IOperatorNodePushable>();
- private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder =
new ArrayList<IOperatorNodePushable>();
+ private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables
= new HashMap<>();
+ private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder =
new ArrayList<>();
private final Map<ActivityId, IActivity> startActivities;
private final SuperActivity parent;
private final IHyracksTaskContext ctx;
private final IRecordDescriptorProvider recordDescProvider;
+ private final ReentrantReadWriteLock actionLock = new
ReentrantReadWriteLock();
+ private volatile boolean cancelled = false;
private final int partition;
private final int nPartitions;
private int inputArity = 0;
private boolean[] startedInitialization;
+ private boolean[] completedInitialization;
+ private boolean[] completedDeinitialization;
public SuperActivityOperatorNodePushable(SuperActivity parent,
Map<ActivityId, IActivity> startActivities,
IHyracksTaskContext ctx, IRecordDescriptorProvider
recordDescProvider, int partition, int nPartitions) {
@@ -82,22 +85,37 @@
public void initialize() throws HyracksDataException {
// Initializes all OperatorNodePushables in parallel and then finally
deinitializes them.
runInParallel((op, index) -> {
- startedInitialization[index] = true;
- op.initialize();
- });
+ actionLock.readLock().lock();
+ try {
+ if (cancelled) {
+ return;
+ }
+ startedInitialization[index] = true;
+ } finally {
+ actionLock.readLock().unlock();
+ }
+ try {
+ op.initialize();
+ } finally {
+ synchronized (op) {
+ completedInitialization[index] = true;
+ op.notify();
+ }
+ }
+ }, true);
}
private void init() throws HyracksDataException {
- Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables =
new HashMap<ActivityId, IOperatorNodePushable>();
- Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>
childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity,
Integer>>>();
- List<IConnectorDescriptor> outputConnectors = null;
+ Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables =
new HashMap<>();
+ Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>
childQueue = new LinkedList<>();
+ List<IConnectorDescriptor> outputConnectors;
/**
* Set up the source operators
*/
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable =
entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
- nPartitions);
+ IOperatorNodePushable opPushable =
+ entry.getValue().createPushRuntime(ctx,
recordDescProvider, partition, nPartitions);
startOperatorNodePushables.put(entry.getKey(), opPushable);
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
@@ -157,16 +175,38 @@
// Sets the startedInitialization flags to be false.
startedInitialization = new
boolean[operatorNodePushablesBFSOrder.size()];
+ completedInitialization = new
boolean[operatorNodePushablesBFSOrder.size()];
+ completedDeinitialization = new
boolean[operatorNodePushablesBFSOrder.size()];
Arrays.fill(startedInitialization, false);
+ Arrays.fill(completedInitialization, false);
+ Arrays.fill(completedDeinitialization, false);
}
@Override
public void deinitialize() throws HyracksDataException {
runInParallel((op, index) -> {
if (startedInitialization[index]) {
- op.deinitialize();
+ if (!completedInitialization[index]) {
+ synchronized (op) {
+ while (!completedInitialization[index]) {
+ try {
+ op.wait();
+ } catch (InterruptedException e) {
+ // Ignore on purpose. we have to deinitialize
+ }
+ }
+ }
+ }
+ try {
+ op.deinitialize();
+ } finally {
+ synchronized (SuperActivityOperatorNodePushable.this) {
+ completedDeinitialization[index] = true;
+ SuperActivityOperatorNodePushable.this.notify();
+ }
+ }
}
- });
+ }, false);
}
@Override
@@ -192,8 +232,7 @@
*/
Pair<ActivityId, Integer> activityIdInputIndex =
parent.getActivityIdInputIndex(index);
IOperatorNodePushable operatorNodePushable =
operatorNodePushables.get(activityIdInputIndex.getLeft());
- IFrameWriter writer =
operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
- return writer;
+ return
operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
}
@Override
@@ -205,25 +244,48 @@
void runAction(IOperatorNodePushable op, int opIndex) throws
HyracksDataException;
}
- private void runInParallel(OperatorNodePushableAction opAction) throws
HyracksDataException {
- List<Future<Void>> initializationTasks = new ArrayList<>();
+ private void runInParallel(OperatorNodePushableAction opAction, boolean
initialize) throws HyracksDataException {
+ List<Future<Void>> tasks = new ArrayList<>();
try {
int index = 0;
// Run one action for all OperatorNodePushables in parallel
through a thread pool.
for (final IOperatorNodePushable op :
operatorNodePushablesBFSOrder) {
final int opIndex = index++;
- initializationTasks.add(ctx.getExecutorService().submit(() -> {
+ tasks.add(ctx.getExecutorService().submit(() -> {
opAction.runAction(op, opIndex);
return null;
}));
}
// Waits until all parallel actions to finish.
- for (Future<Void> initializationTask : initializationTasks) {
- initializationTask.get();
+ for (Future<Void> task : tasks) {
+ task.get();
}
} catch (Exception e) {
- for (Future<Void> initializationTask : initializationTasks) {
- initializationTask.cancel(true);
+ if (initialize) {
+ actionLock.writeLock().lock();
+ try {
+ cancelled = true;
+ for (Future<Void> task : tasks) {
+ task.cancel(true);
+ }
+ } finally {
+ actionLock.writeLock().unlock();
+ }
+ } else {
+ // wait for completion
+ for (int index = 0; index < tasks.size(); index++) {
+ if (completedInitialization[index] &&
!completedDeinitialization[index]) {
+ synchronized (SuperActivityOperatorNodePushable.this) {
+ while (!completedDeinitialization[index]) {
+ try {
+
SuperActivityOperatorNodePushable.this.wait();
+ } catch (InterruptedException interrupt) {
+ // ignoring on purpose. we have to
deinitialize what we have initialized
+ }
+ }
+ }
+ }
+ }
}
throw new HyracksDataException(e);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1681
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie5994f8a51dcf43e42325e89215758c310cd7b99
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>