Yingyi Bu has submitted this change and it was merged. Change subject: Cancel the on-going job if waitForCompletion is interrupted. ......................................................................
Cancel the on-going job if waitForCompletion is interrupted. Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1825 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java 3 files changed, 56 insertions(+), 17 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 4b3aff2..ad54110 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -138,7 +138,13 @@ @Override public void waitForCompletion(JobId jobId) throws Exception { - hci.waitForCompletion(jobId); + try { + hci.waitForCompletion(jobId); + } catch (InterruptedException e) { + // Cancels an on-going job if the current thread gets interrupted. + hci.cancelJob(jobId); + throw e; + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 148d4f5..05a7e2d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -26,8 +26,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -40,6 +38,7 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.hyracks.control.cc.BaseCCApplication; @@ -52,8 +51,9 @@ import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; public abstract class AbstractMultiNCIntegrationTest { @@ -69,9 +69,6 @@ private static IHyracksClientConnection hcc; private final List<File> outputFiles; - - @Rule - public TemporaryFolder outputFolder = new TemporaryFolder(); public AbstractMultiNCIntegrationTest() { outputFiles = new ArrayList<>(); @@ -131,6 +128,10 @@ protected void waitForCompletion(JobId jobId) throws Exception { hcc.waitForCompletion(jobId); + } + + protected JobStatus getJobStatus(JobId jobId) throws Exception { + return hcc.getJobStatus(jobId); } protected void cancelJob(JobId jobId) throws Exception { @@ -205,15 +206,6 @@ } } } - } - - protected File createTempFile() throws IOException { - File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot()); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Output file: " + tempFile.getAbsolutePath()); - } - outputFiles.add(tempFile); - return tempFile; } public static class DummyApplication extends BaseCCApplication { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java index 7c3b66f..7eba9e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; @@ -59,6 +60,14 @@ import org.junit.Test; public class CancelJobTest extends AbstractMultiNCIntegrationTest { + + @Test + public void interruptJobClientAfterWaitForCompletion() throws Exception { + // Interrupts the job client after waitForCompletion() is called. + for (JobSpecification spec : testJobs()) { + interruptAfterWaitForCompletion(spec); + } + } @Test public void cancelExecutingJobAfterWaitForCompletion() throws Exception { @@ -167,6 +176,38 @@ } } + private void interruptAfterWaitForCompletion(JobSpecification spec) throws Exception { + // Submits the job + final JobId jobIdForInterruptTest = startJob(spec); + + // Waits for completion in anther thread + Thread thread = new Thread(() -> { + try { + waitForCompletion(jobIdForInterruptTest); + } catch (Exception e) { + Assert.assertTrue(e instanceof InterruptedException); + } + }); + thread.start(); + + // Interrupts the wait-for-completion thread. + thread.interrupt(); + + // Waits until the thread terminates. + thread.join(); + + // Verifies the job status. + JobStatus jobStatus = getJobStatus(jobIdForInterruptTest); + while (jobStatus == JobStatus.RUNNING) { + synchronized (this) { + // Since job cancellation is asynchronous on NCs, we have to wait there. + wait(1000); + } + jobStatus = getJobStatus(jobIdForInterruptTest); + } + Assert.assertTrue(jobStatus == JobStatus.FAILURE); + } + private void cancelWithoutWait(JobSpecification spec) throws Exception { JobId jobId = startJob(spec); cancelJob(jobId); -- To view, visit https://asterix-gerrit.ics.uci.edu/1825 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
