Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1825
Change subject: Cancel the on-going job if waitForCompletion is interrupted.
......................................................................
Cancel the on-going job if waitForCompletion is interrupted.
Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
3 files changed, 63 insertions(+), 12 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/25/1825/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..dd132e9 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -138,7 +138,15 @@
@Override
public void waitForCompletion(JobId jobId) throws Exception {
- hci.waitForCompletion(jobId);
+ try {
+ hci.waitForCompletion(jobId);
+ } catch (InterruptedException e) {
+ // Cancels an ongoing job is the current thread gets interrupted.
+ hci.cancelJob(jobId);
+ // Re-interrupt the current thread to make sure consequent things
get interrupted.
+ Thread.currentThread().interrupt();
+ throw e;
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 148d4f5..6565c5a 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +38,7 @@
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.cc.BaseCCApplication;
@@ -54,6 +53,9 @@
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
public abstract class AbstractMultiNCIntegrationTest {
@@ -133,6 +135,10 @@
hcc.waitForCompletion(jobId);
}
+ protected JobStatus getJobStatus(JobId jobId) throws Exception {
+ return hcc.getJobStatus(jobId);
+ }
+
protected void cancelJob(JobId jobId) throws Exception {
hcc.cancelJob(jobId);
}
@@ -205,15 +211,6 @@
}
}
}
- }
-
- protected File createTempFile() throws IOException {
- File tempFile = File.createTempFile(getClass().getName(), ".tmp",
outputFolder.getRoot());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Output file: " + tempFile.getAbsolutePath());
- }
- outputFiles.add(tempFile);
- return tempFile;
}
public static class DummyApplication extends BaseCCApplication {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
index 7c3b66f..31f9224 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -59,6 +60,16 @@
import org.junit.Test;
public class CancelJobTest extends AbstractMultiNCIntegrationTest {
+
+ private JobId jobIdForInterruptTest = null;
+
+ @Test
+ public void interruptJobClientAfterWaitForCompletion() throws Exception {
+ // Interrupts the job client after waitForCompletion() is called.
+ for (JobSpecification spec : testJobs()) {
+ interruptAfterWaitForCompletion(spec);
+ }
+ }
@Test
public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
@@ -167,6 +178,41 @@
}
}
+ private void interruptAfterWaitForCompletion(JobSpecification spec) throws
Exception {
+ // Submits the job
+ Thread thread = new Thread(() -> {
+ try {
+ jobIdForInterruptTest = startJob(spec);
+ waitForCompletion(jobIdForInterruptTest);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof InterruptedException);
+ }
+ });
+ thread.start();
+
+ // Waits 1 second.
+ synchronized (this) {
+ wait(1000);
+ }
+
+ // Interrupt the job client.
+ thread.interrupt();
+
+ // Waits until the thread terminates.
+ thread.join();
+
+ // Verifies the job status.
+ JobStatus jobStatus = getJobStatus(jobIdForInterruptTest);
+ while (jobStatus == JobStatus.RUNNING) {
+ // Waits 1 second.
+ synchronized (this) {
+ wait(1000);
+ }
+ jobStatus = getJobStatus(jobIdForInterruptTest);
+ }
+ Assert.assertTrue(jobStatus == JobStatus.FAILURE);
+ }
+
private void cancelWithoutWait(JobSpecification spec) throws Exception {
JobId jobId = startJob(spec);
cancelJob(jobId);
--
To view, visit https://asterix-gerrit.ics.uci.edu/1825
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>