abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2500
Change subject: [NO ISSUE][RT] Report all errors on
SuperActivityOperatorNodePushable
......................................................................
[NO ISSUE][RT] Report all errors on SuperActivityOperatorNodePushable
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Currently, if a failure happens in SuperActivityOperatorNodePushable,
we only report that failure and miss the rest of the failures.
This is especially critical in case of job cancellation since we
don't know where each thread was interrupted.
- After this change, we suppress all other failures in the root
failure for reporting purposes.
Change-Id: Ibbf31dd91303ce2f606734fcccb19270875266b3
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
2 files changed, 46 insertions(+), 41 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/00/2500/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83ab532..d499554 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -44,6 +45,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.ExceptionUtils;
/**
* The runtime of a SuperActivity, which internally executes a DAG of
one-to-one
@@ -193,15 +195,20 @@
}
private void runInParallel(OperatorNodePushableAction action) throws
HyracksDataException {
- List<Future<Void>> tasks = new ArrayList<>();
+ List<Future<Void>> tasks = new
ArrayList<>(operatorNodePushablesBFSOrder.size());
+ Queue<Throwable> failures = new
ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
final Semaphore startSemaphore = new Semaphore(1 -
operatorNodePushablesBFSOrder.size());
final Semaphore completeSemaphore = new Semaphore(1 -
operatorNodePushablesBFSOrder.size());
+ Throwable root = null;
try {
for (final IOperatorNodePushable op :
operatorNodePushablesBFSOrder) {
tasks.add(ctx.getExecutorService().submit(() -> {
startSemaphore.release();
try {
action.run(op);
+ } catch (Throwable th) { // NOSONAR: Must catch all causes
of failure
+ failures.offer(th);
+ throw th;
} finally {
completeSemaphore.release();
}
@@ -211,13 +218,16 @@
for (Future<Void> task : tasks) {
task.get();
}
- } catch (InterruptedException e) {
- cancelTasks(tasks, startSemaphore, completeSemaphore);
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
} catch (ExecutionException e) {
+ root = e.getCause();
+ } catch (Throwable e) { // NOSONAR: Must catch all causes of failure
+ root = e;
+ }
+ if (root != null) {
+ final Throwable failure = root;
cancelTasks(tasks, startSemaphore, completeSemaphore);
- throw HyracksDataException.create(e.getCause());
+ failures.forEach(t -> ExceptionUtils.suppress(failure, t));
+ throw HyracksDataException.create(failure);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index dcfc291..9d99968 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,7 +102,7 @@
private volatile boolean aborted;
- private NodeControllerService ncs;
+ private final NodeControllerService ncs;
private List<List<PartitionChannel>> inputChannelsFromConnectors;
@@ -286,67 +286,62 @@
}
ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
try {
- Exception operatorException = null;
+ Throwable operatorException = null;
try {
operator.initialize();
if (collectors.length > 0) {
final Semaphore sem = new Semaphore(collectors.length - 1);
for (int i = 1; i < collectors.length; ++i) {
+ // Q. Do we ever have a task that has more than one
collector?
final IPartitionCollector collector = collectors[i];
final IFrameWriter writer =
operator.getInputFrameWriter(i);
- sem.acquire();
+ sem.acquireUninterruptibly();
final int cIdx = i;
executorService.execute(() -> {
- Thread thread = Thread.currentThread();
- // Calls synchronized addPendingThread(..) to make
sure that in the abort() method,
- // the thread is not escaped from interruption.
- if (!addPendingThread(thread)) {
- return;
- }
- thread.setName(displayName + ":" + taskAttemptId +
":" + cIdx);
- thread.setPriority(Thread.MIN_PRIORITY);
try {
- pushFrames(collector,
inputChannelsFromConnectors.get(cIdx), writer);
- } catch (HyracksDataException e) {
- synchronized (Task.this) {
- exceptions.add(e);
+ Thread thread = Thread.currentThread();
+ // Calls synchronized addPendingThread(..) to
make sure that in the abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(thread)) {
+ return;
+ }
+ thread.setName(displayName + ":" +
taskAttemptId + ":" + cIdx);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ try {
+ pushFrames(collector,
inputChannelsFromConnectors.get(cIdx), writer);
+ } catch (HyracksDataException e) {
+ synchronized (Task.this) {
+ exceptions.add(e);
+ }
+ } finally {
+ removePendingThread(thread);
}
} finally {
sem.release();
- removePendingThread(thread);
}
});
}
try {
pushFrames(collectors[0],
inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
} finally {
- sem.acquire(collectors.length - 1);
+ sem.acquireUninterruptibly(collectors.length - 1);
}
}
- } catch (Exception e) {
- // Store the operator exception
+ } catch (Throwable e) { // NOSONAR: Must catch all failures
operatorException = e;
- throw e;
} finally {
try {
operator.deinitialize();
- } catch (Exception e) {
- if (operatorException != null) {
- // Add deinitialize exception to the operator
exception to keep track of both
- operatorException.addSuppressed(e);
- } else {
- operatorException = e;
- }
- throw operatorException;
+ } catch (Throwable e) { // NOSONAR: Must catch all failures
+ operatorException =
ExceptionUtils.suppress(operatorException, e);
}
}
- NodeControllerService ncs = joblet.getNodeController();
+ if (operatorException != null) {
+ throw operatorException;
+ }
ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
- } catch (InterruptedException e) {
- exceptions.add(e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- exceptions.add(e);
+ } catch (Throwable e) { // NOSONAR: Catch all failures
+ exceptions.add(HyracksDataException.create(e));
} finally {
close();
removePendingThread(ct);
@@ -360,7 +355,6 @@
exceptions.get(i));
}
}
- NodeControllerService ncs = joblet.getNodeController();
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
ncs.getWorkQueue()
.schedule(new NotifyTaskFailureWork(ncs, this, exceptions,
joblet.getJobId(), taskAttemptId));
@@ -457,6 +451,7 @@
return
ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name,
start, length);
}
+ @Override
public Set<JobFlag> getJobFlags() {
return jobFlags;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2500
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibbf31dd91303ce2f606734fcccb19270875266b3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: release-0.9.4-pre-rc
Gerrit-Owner: abdullah alamoudi <[email protected]>