[BEAM-604] Use Watermark to Finish Streaming Job in TestDataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f6bd47ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f6bd47ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f6bd47ba Branch: refs/heads/master Commit: f6bd47ba79615d90660b4105d8a6c5c276af8551 Parents: 2571cfc Author: Mark Liu <mark...@markliu0.mtv.corp.google.com> Authored: Fri Sep 2 14:41:11 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Sep 27 17:03:56 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/WindowedWordCountIT.java | 49 ++++++ .../dataflow/testing/TestDataflowRunner.java | 52 +++++- .../testing/TestDataflowRunnerTest.java | 158 +++++++++++++++++++ 3 files changed, 255 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java new file mode 100644 index 0000000..890ca2b --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.examples; + +import java.io.IOException; +import org.apache.beam.examples.WindowedWordCount.Options; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * End-to-end integration test of {@link WindowedWordCount}. + */ +@RunWith(JUnit4.class) +public class WindowedWordCountIT { + + /** + * Options for the {@link WindowedWordCount} Integration Test. + */ + public interface TestOptions extends Options, TestPipelineOptions { + } + + @Test + public void testWindowedWordCount() throws IOException { + PipelineOptionsFactory.register(TestOptions.class); + TestOptions options = TestPipeline.testingPipelineOptions().as(TestOptions.class); + + WindowedWordCount.main(TestPipeline.convertToArgs(options)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 9be773b..c569cd4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory; */ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static final String TENTATIVE_COUNTER = "tentative"; + private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + private static final long MAX_WATERMARK_VALUE = -2L; private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); private final TestDataflowPipelineOptions options; @@ -121,11 +123,17 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { if (result.isPresent()) { return result; } + result = checkMaxWatermark(job); + if (result.isPresent()) { + return result; + } Thread.sleep(10000L); } } finally { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - job.cancel(); + if (!job.getState().isTerminal()) { + LOG.info("Cancelling Dataflow job {}", job.getJobId()); + job.cancel(); + } } } }); @@ -191,7 +199,8 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { int successes = 0; int failures = 0; for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null || metric.getName().getContext() == null + if (metric.getName() == null + || metric.getName().getContext() == null || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { // Don't double count using the non-tentative version of the metric. continue; @@ -208,7 +217,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { + } else if (successes > 0 && successes >= expectedNumberOfAssertions) { LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); @@ -222,6 +231,41 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { return Optional.<Boolean>absent(); } + Optional<Boolean> checkMaxWatermark(DataflowPipelineJob job) throws IOException { + State state = job.getState(); + if (state == State.FAILED || state == State.CANCELLED) { + LOG.info("The pipeline {}", state); + return Optional.of(false); + } + + JobMetrics metrics = options.getDataflowClient().projects().jobs() + .getMetrics(job.getProjectId(), job.getJobId()).execute(); + + if (metrics == null || metrics.getMetrics() == null) { + LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); + } else { + boolean hasMaxWatermark = false; + for (MetricUpdate metric : metrics.getMetrics()) { + if (metric.getName() == null + || metric.getName().getName() == null + || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX) + || metric.getScalar() == null) { + continue; + } + BigDecimal watermark = (BigDecimal) metric.getScalar(); + hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE; + if (!hasMaxWatermark) { + break; + } + } + if (hasMaxWatermark) { + LOG.info("All watermarks of job {} reach to max value.", job.getJobId()); + return Optional.of(true); + } + } + return Optional.absent(); + } + @Override public String toString() { return "TestDataflowRunner#" + options.getAppName(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 24d5b23..70c4562 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -47,6 +47,7 @@ import com.google.common.collect.Lists; import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; +import java.util.List; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -226,6 +227,35 @@ public class TestDataflowRunnerTest { } @Test + public void testRunStreamingJobThatReachMaxWatermarkAndSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + runner.run(p, mockRunner); + } + + @Test public void testRunStreamingJobThatFails() throws Exception { options.setStreaming(true); Pipeline p = TestPipeline.create(options); @@ -253,6 +283,43 @@ public class TestDataflowRunnerTest { fail("AssertionError expected"); } + private LowLevelHttpResponse generateMockStreamingMetricResponse( + boolean hasWatermark, + boolean maxWatermark, + boolean multipleWatermarks, + boolean multipleMaxWatermark) throws IOException { + List<MetricUpdate> metrics = Lists.newArrayList(); + + MetricStructuredName name = new MetricStructuredName(); + name.setName(hasWatermark ? "windmill-data-watermark" : "no-watermark"); + name.setContext(ImmutableMap.<String, String>of()); + + MetricUpdate metric = new MetricUpdate(); + metric.setName(name); + metric.setScalar(maxWatermark ? BigDecimal.valueOf(-2L) : BigDecimal.ONE); + metrics.add(metric); + + if (multipleWatermarks) { + MetricStructuredName nameTwo = new MetricStructuredName(); + nameTwo.setName(hasWatermark ? "windmill-data-watermark" : "no-watermark"); + nameTwo.setContext(ImmutableMap.<String, String>of()); + + MetricUpdate metricTwo = new MetricUpdate(); + metricTwo.setName(nameTwo); + metricTwo.setScalar(multipleMaxWatermark ? BigDecimal.valueOf(-2L) : BigDecimal.ONE); + metrics.add(metricTwo); + } + + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setContentType(Json.MEDIA_TYPE); + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setMetrics(metrics); + // N.B. Setting the factory is necessary in order to get valid JSON. + jobMetrics.setFactory(Transport.getJsonFactory()); + response.setContent(jobMetrics.toPrettyString()); + return response; + } + @Test public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { DataflowPipelineJob job = @@ -320,6 +387,96 @@ public class TestDataflowRunnerTest { } @Test + public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + when(request.execute()) + .thenReturn(generateMockStreamingMetricResponse( + false /* hasWatermark */, + false /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.absent(), runner.checkMaxWatermark(job)); + } + + @Test + public void testCheckMaxWatermarkWithSingleMaxWatermark() throws IOException { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + when(request.execute()) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.of(true), runner.checkMaxWatermark(job)); + } + + @Test + public void testCheckMaxWatermarkWithSingleWatermarkNotMax() throws IOException { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + when(request.execute()) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + false /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.absent(), runner.checkMaxWatermark(job)); + } + + @Test + public void testCheckMaxWatermarkWithMultipleMaxWatermark() throws IOException { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + when(request.execute()) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + true /* multipleWatermarks */, + true /* multipleMaxWatermark */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.of(true), runner.checkMaxWatermark(job)); + } + + @Test + public void testCheckMaxWatermarkWithMaxAndNotMaxWatermarkMixed() throws IOException { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + when(request.execute()) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + true /* multipleWatermarks */, + false /* multipleMaxWatermark */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.absent(), runner.checkMaxWatermark(job)); + } + + @Test public void testStreamingPipelineFailsIfServiceFails() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-project", "test-job", options, null)); @@ -332,6 +489,7 @@ public class TestDataflowRunnerTest { generateMockMetricResponse(true /* success */, false /* tentative */)); doReturn(State.FAILED).when(job).getState(); assertEquals(Optional.of(false), runner.checkForSuccess(job)); + assertEquals(Optional.of(false), runner.checkMaxWatermark(job)); } @Test