Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1243
Change subject: Fix potential hanging in op.initialize().
......................................................................
Fix potential hanging in op.initialize().
Change-Id: I701b271bc6dc78e67274fa845dec013756843a70
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
2 files changed, 23 insertions(+), 5 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/43/1243/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index f463bfa..ff60d42 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -235,8 +235,12 @@
}
}
- private synchronized void addPendingThread(Thread t) {
+ private synchronized boolean addPendingThread(Thread t) {
+ if (aborted) {
+ return false;
+ }
pendingThreads.add(t);
+ return true;
}
private synchronized void removePendingThread(Thread t) {
@@ -256,7 +260,14 @@
public void run() {
Thread ct = Thread.currentThread();
String threadName = ct.getName();
- addPendingThread(ct);
+ // Calls synchronized addPendingThread(..) to make sure that in the
abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(ct)) {
+ exceptions.add(new InterruptedException("Task gets aborted!"));
+ ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this,
exceptions));
+ return;
+ }
try {
ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
try {
@@ -271,11 +282,12 @@
executorService.execute(new Runnable() {
@Override
public void run() {
- if (aborted) {
+ Thread thread = Thread.currentThread();
+ // Calls synchronized addPendingThread(..) to
make sure that in the abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(thread)) {
return;
}
- Thread thread = Thread.currentThread();
- addPendingThread(thread);
String oldName = thread.getName();
thread.setName(displayName + ":" +
taskAttemptId + ":" + cIdx);
thread.setPriority(Thread.MIN_PRIORITY);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 6a7a6a7..9cc23fa 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -32,6 +32,12 @@
@Test
public void failureOnCreatePushRuntime() throws Exception {
+ for (int round = 0; round < 1000; ++round) {
+ runTest();
+ }
+ }
+
+ private void runTest() throws Exception {
JobSpecification spec = new JobSpecification();
AbstractSingleActivityOperatorDescriptor sourceOpDesc = new
ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
0, 1, new int[] { 4 }, true);
--
To view, visit https://asterix-gerrit.ics.uci.edu/1243
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I701b271bc6dc78e67274fa845dec013756843a70
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>