Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1243

Change subject: Fix potential hanging in op.initialize().
......................................................................

Fix potential hanging in op.initialize().

Change-Id: I701b271bc6dc78e67274fa845dec013756843a70
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
2 files changed, 23 insertions(+), 5 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/43/1243/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index f463bfa..ff60d42 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -235,8 +235,12 @@
         }
     }
 
-    private synchronized void addPendingThread(Thread t) {
+    private synchronized boolean addPendingThread(Thread t) {
+        if (aborted) {
+            return false;
+        }
         pendingThreads.add(t);
+        return true;
     }
 
     private synchronized void removePendingThread(Thread t) {
@@ -256,7 +260,14 @@
     public void run() {
         Thread ct = Thread.currentThread();
         String threadName = ct.getName();
-        addPendingThread(ct);
+        // Calls synchronized addPendingThread(..) to make sure that in the 
abort() method,
+        // the thread is not escaped from interruption.
+        if (!addPendingThread(ct)) {
+            exceptions.add(new InterruptedException("Task gets aborted!"));
+            ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, 
exceptions));
+            return;
+        }
         try {
             ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
             try {
@@ -271,11 +282,12 @@
                         executorService.execute(new Runnable() {
                             @Override
                             public void run() {
-                                if (aborted) {
+                                Thread thread = Thread.currentThread();
+                                // Calls synchronized addPendingThread(..) to 
make sure that in the abort() method,
+                                // the thread is not escaped from interruption.
+                                if (!addPendingThread(thread)) {
                                     return;
                                 }
-                                Thread thread = Thread.currentThread();
-                                addPendingThread(thread);
                                 String oldName = thread.getName();
                                 thread.setName(displayName + ":" + 
taskAttemptId + ":" + cIdx);
                                 thread.setPriority(Thread.MIN_PRIORITY);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 6a7a6a7..9cc23fa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -32,6 +32,12 @@
 
     @Test
     public void failureOnCreatePushRuntime() throws Exception {
+        for (int round = 0; round < 1000; ++round) {
+            runTest();
+        }
+    }
+
+    private void runTest() throws Exception {
         JobSpecification spec = new JobSpecification();
         AbstractSingleActivityOperatorDescriptor sourceOpDesc = new 
ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
                 0, 1, new int[] { 4 }, true);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1243
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I701b271bc6dc78e67274fa845dec013756843a70
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to