Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1825

Change subject: Cancel the on-going job if waitForCompletion is interrupted.
......................................................................

Cancel the on-going job if waitForCompletion is interrupted.

Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
3 files changed, 63 insertions(+), 12 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/25/1825/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..dd132e9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -138,7 +138,15 @@
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        hci.waitForCompletion(jobId);
+        try {
+            hci.waitForCompletion(jobId);
+        } catch (InterruptedException e) {
+            // Cancels an ongoing job is the current thread gets interrupted.
+            hci.cancelJob(jobId);
+            // Re-interrupt the current thread to make sure consequent things 
get interrupted.
+            Thread.currentThread().interrupt();
+            throw e;
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 148d4f5..6565c5a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +38,7 @@
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.control.cc.BaseCCApplication;
@@ -54,6 +53,9 @@
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 
 public abstract class AbstractMultiNCIntegrationTest {
 
@@ -133,6 +135,10 @@
         hcc.waitForCompletion(jobId);
     }
 
+    protected JobStatus getJobStatus(JobId jobId) throws Exception {
+        return hcc.getJobStatus(jobId);
+    }
+
     protected void cancelJob(JobId jobId) throws Exception {
         hcc.cancelJob(jobId);
     }
@@ -205,15 +211,6 @@
                 }
             }
         }
-    }
-
-    protected File createTempFile() throws IOException {
-        File tempFile = File.createTempFile(getClass().getName(), ".tmp", 
outputFolder.getRoot());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Output file: " + tempFile.getAbsolutePath());
-        }
-        outputFiles.add(tempFile);
-        return tempFile;
     }
 
     public static class DummyApplication extends BaseCCApplication {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
index 7c3b66f..31f9224 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -59,6 +60,16 @@
 import org.junit.Test;
 
 public class CancelJobTest extends AbstractMultiNCIntegrationTest {
+
+    private JobId jobIdForInterruptTest = null;
+
+    @Test
+    public void interruptJobClientAfterWaitForCompletion() throws Exception {
+        // Interrupts the job client after waitForCompletion() is called.
+        for (JobSpecification spec : testJobs()) {
+            interruptAfterWaitForCompletion(spec);
+        }
+    }
 
     @Test
     public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
@@ -167,6 +178,41 @@
         }
     }
 
+    private void interruptAfterWaitForCompletion(JobSpecification spec) throws 
Exception {
+        // Submits the job
+        Thread thread = new Thread(() -> {
+            try {
+                jobIdForInterruptTest = startJob(spec);
+                waitForCompletion(jobIdForInterruptTest);
+            } catch (Exception e) {
+                Assert.assertTrue(e instanceof InterruptedException);
+            }
+        });
+        thread.start();
+
+        // Waits 1 second.
+        synchronized (this) {
+            wait(1000);
+        }
+
+        // Interrupt the job client.
+        thread.interrupt();
+
+        // Waits until the thread terminates.
+        thread.join();
+
+        // Verifies the job status.
+        JobStatus jobStatus = getJobStatus(jobIdForInterruptTest);
+        while (jobStatus == JobStatus.RUNNING) {
+            // Waits 1 second.
+            synchronized (this) {
+                wait(1000);
+            }
+            jobStatus = getJobStatus(jobIdForInterruptTest);
+        }
+        Assert.assertTrue(jobStatus == JobStatus.FAILURE);
+    }
+
     private void cancelWithoutWait(JobSpecification spec) throws Exception {
         JobId jobId = startJob(spec);
         cancelJob(jobId);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1825
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to