[BEAM-604] Use Watermark to Finish Streaming Job in TestDataflowRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f6bd47ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f6bd47ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f6bd47ba

Branch: refs/heads/master
Commit: f6bd47ba79615d90660b4105d8a6c5c276af8551
Parents: 2571cfc
Author: Mark Liu <mark...@markliu0.mtv.corp.google.com>
Authored: Fri Sep 2 14:41:11 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 27 17:03:56 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/WindowedWordCountIT.java      |  49 ++++++
 .../dataflow/testing/TestDataflowRunner.java    |  52 +++++-
 .../testing/TestDataflowRunnerTest.java         | 158 +++++++++++++++++++
 3 files changed, 255 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
new file mode 100644
index 0000000..890ca2b
--- /dev/null
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples;
+
+import java.io.IOException;
+import org.apache.beam.examples.WindowedWordCount.Options;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end integration test of {@link WindowedWordCount}.
+ */
+@RunWith(JUnit4.class)
+public class WindowedWordCountIT {
+
+  /**
+   * Options for the {@link WindowedWordCount} Integration Test.
+   */
+  public interface TestOptions extends Options, TestPipelineOptions {
+  }
+
+  @Test
+  public void testWindowedWordCount() throws IOException {
+    PipelineOptionsFactory.register(TestOptions.class);
+    TestOptions options = 
TestPipeline.testingPipelineOptions().as(TestOptions.class);
+
+    WindowedWordCount.main(TestPipeline.convertToArgs(options));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 9be773b..c569cd4 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory;
  */
 public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final String TENTATIVE_COUNTER = "tentative";
+  private static final String WATERMARK_METRIC_SUFFIX = 
"windmill-data-watermark";
+  private static final long MAX_WATERMARK_VALUE = -2L;
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataflowRunner.class);
 
   private final TestDataflowPipelineOptions options;
@@ -121,11 +123,17 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
                 if (result.isPresent()) {
                   return result;
                 }
+                result = checkMaxWatermark(job);
+                if (result.isPresent()) {
+                  return result;
+                }
                 Thread.sleep(10000L);
               }
             } finally {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
+              if (!job.getState().isTerminal()) {
+                LOG.info("Cancelling Dataflow job {}", job.getJobId());
+                job.cancel();
+              }
             }
           }
         });
@@ -191,7 +199,8 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       int successes = 0;
       int failures = 0;
       for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null || metric.getName().getContext() == null
+        if (metric.getName() == null
+            || metric.getName().getContext() == null
             || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
           // Don't double count using the non-tentative version of the metric.
           continue;
@@ -208,7 +217,7 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
             + "{} expected assertions.", job.getJobId(), successes, failures,
             expectedNumberOfAssertions);
         return Optional.of(false);
-      } else if (successes >= expectedNumberOfAssertions) {
+      } else if (successes > 0 && successes >= expectedNumberOfAssertions) {
         LOG.info("Found result while running Dataflow job {}. Found {} 
success, {} failures out of "
             + "{} expected assertions.", job.getJobId(), successes, failures,
             expectedNumberOfAssertions);
@@ -222,6 +231,41 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     return Optional.<Boolean>absent();
   }
 
+  Optional<Boolean> checkMaxWatermark(DataflowPipelineJob job) throws 
IOException {
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info("The pipeline {}", state);
+      return Optional.of(false);
+    }
+
+    JobMetrics metrics = options.getDataflowClient().projects().jobs()
+        .getMetrics(job.getProjectId(), job.getJobId()).execute();
+
+    if (metrics == null || metrics.getMetrics() == null) {
+      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+    } else {
+      boolean hasMaxWatermark = false;
+      for (MetricUpdate metric : metrics.getMetrics()) {
+        if (metric.getName() == null
+            || metric.getName().getName() == null
+            || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX)
+            || metric.getScalar() == null) {
+          continue;
+        }
+        BigDecimal watermark = (BigDecimal) metric.getScalar();
+        hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE;
+        if (!hasMaxWatermark) {
+          break;
+        }
+      }
+      if (hasMaxWatermark) {
+        LOG.info("All watermarks of job {} reach to max value.", 
job.getJobId());
+        return Optional.of(true);
+      }
+    }
+    return Optional.absent();
+  }
+
   @Override
   public String toString() {
     return "TestDataflowRunner#" + options.getAppName();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 24d5b23..70c4562 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -47,6 +47,7 @@ import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.List;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -226,6 +227,35 @@ public class TestDataflowRunnerTest {
   }
 
   @Test
+  public void testRunStreamingJobThatReachMaxWatermarkAndSucceeds() throws 
Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute())
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */))
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    runner.run(p, mockRunner);
+  }
+
+  @Test
   public void testRunStreamingJobThatFails() throws Exception {
     options.setStreaming(true);
     Pipeline p = TestPipeline.create(options);
@@ -253,6 +283,43 @@ public class TestDataflowRunnerTest {
     fail("AssertionError expected");
   }
 
+  private LowLevelHttpResponse generateMockStreamingMetricResponse(
+      boolean hasWatermark,
+      boolean maxWatermark,
+      boolean multipleWatermarks,
+      boolean multipleMaxWatermark) throws IOException {
+    List<MetricUpdate> metrics = Lists.newArrayList();
+
+    MetricStructuredName name = new MetricStructuredName();
+    name.setName(hasWatermark ? "windmill-data-watermark" : "no-watermark");
+    name.setContext(ImmutableMap.<String, String>of());
+
+    MetricUpdate metric = new MetricUpdate();
+    metric.setName(name);
+    metric.setScalar(maxWatermark ? BigDecimal.valueOf(-2L) : BigDecimal.ONE);
+    metrics.add(metric);
+
+    if (multipleWatermarks) {
+      MetricStructuredName nameTwo = new MetricStructuredName();
+      nameTwo.setName(hasWatermark ? "windmill-data-watermark" : 
"no-watermark");
+      nameTwo.setContext(ImmutableMap.<String, String>of());
+
+      MetricUpdate metricTwo = new MetricUpdate();
+      metricTwo.setName(nameTwo);
+      metricTwo.setScalar(multipleMaxWatermark ? BigDecimal.valueOf(-2L) : 
BigDecimal.ONE);
+      metrics.add(metricTwo);
+    }
+
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(metrics);
+    // N.B. Setting the factory is necessary in order to get valid JSON.
+    jobMetrics.setFactory(Transport.getJsonFactory());
+    response.setContent(jobMetrics.toPrettyString());
+    return response;
+  }
+
   @Test
   public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
     DataflowPipelineJob job =
@@ -320,6 +387,96 @@ public class TestDataflowRunnerTest {
   }
 
   @Test
+  public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    when(request.execute())
+        .thenReturn(generateMockStreamingMetricResponse(
+            false /* hasWatermark */,
+            false /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+  }
+
+  @Test
+  public void testCheckMaxWatermarkWithSingleMaxWatermark() throws IOException 
{
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    when(request.execute())
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.of(true), runner.checkMaxWatermark(job));
+  }
+
+  @Test
+  public void testCheckMaxWatermarkWithSingleWatermarkNotMax() throws 
IOException {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    when(request.execute())
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            false /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+  }
+
+  @Test
+  public void testCheckMaxWatermarkWithMultipleMaxWatermark() throws 
IOException {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    when(request.execute())
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            true /* multipleWatermarks */,
+            true /* multipleMaxWatermark */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.of(true), runner.checkMaxWatermark(job));
+  }
+
+  @Test
+  public void testCheckMaxWatermarkWithMaxAndNotMaxWatermarkMixed() throws 
IOException {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    when(request.execute())
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            true /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+  }
+
+  @Test
   public void testStreamingPipelineFailsIfServiceFails() throws Exception {
     DataflowPipelineJob job =
         spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
@@ -332,6 +489,7 @@ public class TestDataflowRunnerTest {
         generateMockMetricResponse(true /* success */, false /* tentative */));
     doReturn(State.FAILED).when(job).getState();
     assertEquals(Optional.of(false), runner.checkForSuccess(job));
+    assertEquals(Optional.of(false), runner.checkMaxWatermark(job));
   }
 
   @Test

Reply via email to