Repository: incubator-beam Updated Branches: refs/heads/master 9c8e19c1c -> 9e7ed2929
Check Dataflow Job Status Before Terminate Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e776ae73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e776ae73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e776ae73 Branch: refs/heads/master Commit: e776ae7334aed7e42d152dbbcfa7b5a1fb998e27 Parents: 9c8e19c Author: Mark Liu <mark...@markliu-macbookpro.roam.corp.google.com> Authored: Mon Sep 19 15:24:17 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon Sep 19 21:29:21 2016 -0700 ---------------------------------------------------------------------- .../runners/dataflow/DataflowPipelineJob.java | 19 +++-- .../dataflow/DataflowPipelineJobTest.java | 74 ++++++++++++++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e776ae73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 1af8c98..269b824 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -301,14 +301,21 @@ public class DataflowPipelineJob implements PipelineResult { dataflowOptions.getDataflowClient().projects().jobs() .update(projectId, jobId, content) .execute(); + return State.CANCELLED; } catch (IOException e) { - String errorMsg = String.format( - "Failed to cancel the job, please go to the Developers Console to cancel it manually: %s", - MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId())); - LOG.warn(errorMsg); - throw new IOException(errorMsg, e); + State state = getState(); + if (state.isTerminal()) { + LOG.warn("Job is already terminated. State is {}", state); + return state; + } else { + String errorMsg = String.format( + "Failed to cancel the job, " + + "please go to the Developers Console to cancel it manually: %s", + MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId())); + LOG.warn(errorMsg); + throw new IOException(errorMsg, e); + } } - return State.CANCELLED; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e776ae73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 4c70d12..2af95e2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -29,7 +29,11 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.api.client.util.NanoClock; @@ -655,4 +659,74 @@ public class DataflowPipelineJobTest { fastNanoTime += millis * 1000000L + ThreadLocalRandom.current().nextInt(500000); } } + + @Test + public void testCancelUnterminatedJobThatSucceeds() throws IOException { + Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class); + when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update); + when(update.execute()).thenReturn(new Job()); + + DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null); + + assertEquals(State.CANCELLED, job.cancel()); + Job content = new Job(); + content.setProjectId(PROJECT_ID); + content.setId(JOB_ID); + content.setRequestedState("JOB_STATE_CANCELLED"); + verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content)); + verifyNoMoreInteractions(mockJobs); + } + + @Test + public void testCancelUnterminatedJobThatFails() throws IOException { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class); + when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update); + when(update.execute()).thenThrow(new IOException()); + + DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null); + + thrown.expect(IOException.class); + thrown.expectMessage("Failed to cancel the job, " + + "please go to the Developers Console to cancel it manually:"); + job.cancel(); + + Job content = new Job(); + content.setProjectId(PROJECT_ID); + content.setId(JOB_ID); + content.setRequestedState("JOB_STATE_CANCELLED"); + verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content)); + verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID)); + } + + @Test + public void testCancelTerminatedJob() throws IOException { + Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_FAILED"); + when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class); + when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update); + when(update.execute()).thenThrow(new IOException()); + + DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null); + + assertEquals(State.FAILED, job.cancel()); + Job content = new Job(); + content.setProjectId(PROJECT_ID); + content.setId(JOB_ID); + content.setRequestedState("JOB_STATE_CANCELLED"); + verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content)); + verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID)); + verifyNoMoreInteractions(mockJobs); + } }