More javadoc and keep retry in case of get metrics exception

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/370c1714
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/370c1714
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/370c1714

Branch: refs/heads/master
Commit: 370c171450215d3fac4208875850279a796415c9
Parents: fbae96f
Author: Mark Liu <mark...@markliu-macbookpro.roam.corp.google.com>
Authored: Wed Sep 14 13:18:40 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 27 17:03:57 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |   7 +-
 .../beam/examples/WindowedWordCountIT.java      |  13 +-
 .../dataflow/testing/TestDataflowRunner.java    | 158 +++++-----
 .../testing/TestDataflowRunnerTest.java         | 287 ++++++++++---------
 .../apache/beam/sdk/testing/StreamingIT.java    |  13 +-
 5 files changed, 263 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 31244db..6b1b7ce 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -218,11 +218,8 @@
     </profile>
 
     <!--
-      This profile disable streaming integration tests which
-      have @Category(StreamingIT.class) annotation.
-
-      This profile can be abled on the command line
-      by specifying -P disable-streaming-ITs.
+      This profile disables streaming integration tests which
+      have the @Category(StreamingIT.class) annotation.
     -->
     <profile>
       <id>disable-streaming-ITs</id>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index cddcd4a..379d1b0 100644
--- 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.beam.examples;
 
 import java.io.IOException;
@@ -39,23 +38,25 @@ public class WindowedWordCountIT {
   /**
    * Options for the {@link WindowedWordCount} Integration Test.
    */
-  public interface TestOptions extends Options, TestPipelineOptions, 
StreamingOptions{
+  public interface WindowedWordCountITOptions
+      extends Options, TestPipelineOptions, StreamingOptions {
   }
 
   @Test
   public void testWindowedWordCountInBatch() throws IOException {
-    testWindowedWordCountPipeline(false);
+    testWindowedWordCountPipeline(false /* isStreaming */);
   }
 
   @Test
   @Category(StreamingIT.class)
   public void testWindowedWordCountInStreaming() throws IOException {
-    testWindowedWordCountPipeline(true);
+    testWindowedWordCountPipeline(true /* isStreaming */);
   }
 
   private void testWindowedWordCountPipeline(boolean isStreaming) throws 
IOException {
-    PipelineOptionsFactory.register(TestOptions.class);
-    TestOptions options = 
TestPipeline.testingPipelineOptions().as(TestOptions.class);
+    PipelineOptionsFactory.register(WindowedWordCountITOptions.class);
+    WindowedWordCountITOptions options =
+        
TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class);
     options.setStreaming(isStreaming);
 
     WindowedWordCount.main(TestPipeline.convertToArgs(options));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index b8b4eaf..a152505 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
@@ -31,6 +32,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -110,7 +112,7 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         job, new MonitoringUtil.LoggingHandler());
 
     try {
-      final Optional<Boolean> result;
+      final Optional<Boolean> success;
 
       if (options.isStreaming()) {
         Future<Optional<Boolean>> resultFuture = 
options.getExecutorService().submit(
@@ -119,9 +121,13 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
           public Optional<Boolean> call() throws Exception {
             try {
               for (;;) {
-                Optional<Boolean> result = checkForSuccess(job);
-                if (result.isPresent() && (!result.get() || 
checkMaxWatermark(job))) {
-                  return result;
+                JobMetrics metrics = getJobMetrics(job);
+                Optional<Boolean> success = checkForPAssertSuccess(job, 
metrics);
+                if (success.isPresent() && (!success.get() || 
atMaxWatermark(job, metrics))) {
+                  // It's possible that the streaming pipeline doesn't use 
PAssert.
+                  // So checkForSuccess() will return true before job is 
finished.
+                  // atMaxWatermark() will handle this case.
+                  return success;
                 }
                 Thread.sleep(10000L);
               }
@@ -139,15 +145,15 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
               job.getJobId());
           job.cancel();
         }
-        result = resultFuture.get();
+        success = resultFuture.get();
       } else {
         job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
-        result = checkForSuccess(job);
+        success = checkForPAssertSuccess(job, getJobMetrics(job));
       }
-      if (!result.isPresent()) {
+      if (!success.isPresent()) {
         throw new IllegalStateException(
             "The dataflow did not output a success or failure metric.");
-      } else if (!result.get()) {
+      } else if (!success.get()) {
         throw new AssertionError(messageHandler.getErrorMessage() == null
             ? "The dataflow did not return a failure reason."
             : messageHandler.getErrorMessage());
@@ -178,7 +184,18 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     return runner.apply(transform, input);
   }
 
-  Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
+  /**
+   * Check that PAssert expectations were met.
+   *
+   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts 
were used
+   * within the pipeline, then this method will state that all PAsserts 
succeeded.
+   *
+   * @return Optional.of(false) if the job failed, was cancelled or if any 
PAssert
+   *         expectation was not met, true if all the PAssert expectations 
passed,
+   *         Optional.absent() if the metrics were inconclusive.
+   */
+  @VisibleForTesting
+  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job, @Nullable 
JobMetrics metrics)
       throws IOException {
     State state = job.getState();
     if (state == State.FAILED || state == State.CANCELLED) {
@@ -186,74 +203,85 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       return Optional.of(false);
     }
 
-    JobMetrics metrics = options.getDataflowClient().projects().jobs()
-        .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
     if (metrics == null || metrics.getMetrics() == null) {
       LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-    } else {
-      int successes = 0;
-      int failures = 0;
-      for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null
-            || metric.getName().getContext() == null
-            || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-          // Don't double count using the non-tentative version of the metric.
-          continue;
-        }
-        if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-          successes += ((BigDecimal) metric.getScalar()).intValue();
-        } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) 
{
-          failures += ((BigDecimal) metric.getScalar()).intValue();
-        }
-      }
+      return Optional.absent();
+    }
 
-      if (failures > 0) {
-        LOG.info("Found result while running Dataflow job {}. Found {} 
success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(false);
-      } else if (successes >= expectedNumberOfAssertions) {
-        LOG.info("Found result while running Dataflow job {}. Found {} 
success, {} failures out of "
-            + "{} expected assertions.", job.getJobId(), successes, failures,
-            expectedNumberOfAssertions);
-        return Optional.of(true);
+    int successes = 0;
+    int failures = 0;
+    for (MetricUpdate metric : metrics.getMetrics()) {
+      if (metric.getName() == null
+          || metric.getName().getContext() == null
+          || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+        // Don't double count using the non-tentative version of the metric.
+        continue;
       }
+      if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+        successes += ((BigDecimal) metric.getScalar()).intValue();
+      } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+        failures += ((BigDecimal) metric.getScalar()).intValue();
+      }
+    }
 
-      LOG.info("Running Dataflow job {}. Found {} success, {} failures out of 
{} expected "
-          + "assertions.", job.getJobId(), successes, failures, 
expectedNumberOfAssertions);
+    if (failures > 0) {
+      LOG.info("Found result while running Dataflow job {}. Found {} success, 
{} failures out of "
+          + "{} expected assertions.", job.getJobId(), successes, failures,
+          expectedNumberOfAssertions);
+      return Optional.of(false);
+    } else if (successes >= expectedNumberOfAssertions) {
+      LOG.info("Found result while running Dataflow job {}. Found {} success, 
{} failures out of "
+          + "{} expected assertions.", job.getJobId(), successes, failures,
+          expectedNumberOfAssertions);
+      return Optional.of(true);
     }
 
-    return Optional.<Boolean>absent();
+    LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} 
expected "
+        + "assertions.", job.getJobId(), successes, failures, 
expectedNumberOfAssertions);
+    return Optional.absent();
   }
 
-  boolean checkMaxWatermark(DataflowPipelineJob job) throws IOException {
-    JobMetrics metrics = options.getDataflowClient().projects().jobs()
-        .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-    } else {
-      boolean hasMaxWatermark = false;
-      for (MetricUpdate metric : metrics.getMetrics()) {
-        if (metric.getName() == null
-            || metric.getName().getName() == null
-            || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX)
-            || metric.getScalar() == null) {
-          continue;
-        }
-        BigDecimal watermark = (BigDecimal) metric.getScalar();
-        hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE;
-        if (!hasMaxWatermark) {
-          break;
-        }
+  /**
+   * Check watermarks of the streaming job. At least one watermark metric must 
exist.
+   *
+   * @return true if all watermarks are at max, false otherwise.
+   */
+  @VisibleForTesting
+  boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) {
+    boolean hasMaxWatermark = false;
+    for (MetricUpdate metric : metrics.getMetrics()) {
+      if (metric.getName() == null
+          || metric.getName().getName() == null
+          || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX)
+          || metric.getScalar() == null) {
+        continue;
       }
-      if (hasMaxWatermark) {
-        LOG.info("All watermarks of job {} reach to max value.", 
job.getJobId());
-        return true;
+      BigDecimal watermark = (BigDecimal) metric.getScalar();
+      hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE;
+      if (!hasMaxWatermark) {
+        LOG.info("Found a non-max watermark metric {} in job {}", 
metric.getName().getName(),
+            job.getJobId());
+        return false;
       }
     }
-    return false;
+
+    if (hasMaxWatermark) {
+      LOG.info("All watermarks are at max. JobID: {}", job.getJobId());
+    }
+    return hasMaxWatermark;
+  }
+
+  @Nullable
+  @VisibleForTesting
+  JobMetrics getJobMetrics(DataflowPipelineJob job) {
+    JobMetrics metrics = null;
+    try {
+      metrics = options.getDataflowClient().projects().jobs()
+          .getMetrics(job.getProjectId(), job.getJobId()).execute();
+    } catch (IOException e) {
+      LOG.warn("Failed to get job metrics: ", e);
+    }
+    return metrics;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 3818b35..e6b513a 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.testing;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -50,6 +51,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -93,6 +95,9 @@ public class TestDataflowRunnerTest {
   @Mock private MockLowLevelHttpRequest request;
   @Mock private GcsUtil mockGcsUtil;
 
+  private static final String WATERMARK_METRIC_SUFFIX = 
"windmill-data-watermark";
+  private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2);
+
   private TestDataflowPipelineOptions options;
   private Dataflow service;
 
@@ -135,8 +140,8 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     assertEquals(mockJob, runner.run(p, mockRunner));
   }
 
@@ -155,6 +160,8 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
+        false /* tentative */, null /* additionalMetrics */));
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -192,8 +199,8 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
@@ -208,7 +215,7 @@ public class TestDataflowRunnerTest {
   }
 
   @Test
-  public void testRunStreamingJobThatSucceeds() throws Exception {
+  public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
     options.setStreaming(true);
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
@@ -223,18 +230,14 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     when(request.execute())
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */))
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */,
+            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     runner.run(p, mockRunner);
   }
 
   @Test
-  public void testRunStreamingJobThatReachMaxWatermarkAndSucceeds() throws 
Exception {
+  public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws 
Exception {
     options.setStreaming(true);
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
@@ -249,15 +252,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute())
         .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */))
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     runner.run(p, mockRunner);
   }
@@ -277,8 +272,8 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
@@ -290,41 +285,61 @@ public class TestDataflowRunnerTest {
     fail("AssertionError expected");
   }
 
-  private LowLevelHttpResponse generateMockStreamingMetricResponse(
-      boolean hasWatermark,
-      boolean maxWatermark,
-      boolean multipleWatermarks,
-      boolean multipleMaxWatermark) throws IOException {
-    List<MetricUpdate> metrics = Lists.newArrayList();
+  private LowLevelHttpResponse generateMockMetricResponse(boolean success, 
boolean tentative,
+                                                          Map<String, 
BigDecimal> additionalMetrics)
+      throws Exception {
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
+    if (additionalMetrics != null && !additionalMetrics.isEmpty()) {
+      metrics.addAll(generateMockStreamingMetrics(additionalMetrics));
+    }
+    JobMetrics jobMetrics = buildJobMetrics(metrics);
+    response.setContent(jobMetrics.toPrettyString());
+    return response;
+  }
 
+  private List<MetricUpdate> generateMockMetrics(boolean success, boolean 
tentative) {
     MetricStructuredName name = new MetricStructuredName();
-    name.setName(hasWatermark ? "windmill-data-watermark" : "no-watermark");
-    name.setContext(ImmutableMap.<String, String>of());
+    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+    name.setContext(
+        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, 
String>of());
 
     MetricUpdate metric = new MetricUpdate();
     metric.setName(name);
-    metric.setScalar(maxWatermark ? BigDecimal.valueOf(-2L) : BigDecimal.ONE);
-    metrics.add(metric);
-
-    if (multipleWatermarks) {
-      MetricStructuredName nameTwo = new MetricStructuredName();
-      nameTwo.setName(hasWatermark ? "windmill-data-watermark" : 
"no-watermark");
-      nameTwo.setContext(ImmutableMap.<String, String>of());
-
-      MetricUpdate metricTwo = new MetricUpdate();
-      metricTwo.setName(nameTwo);
-      metricTwo.setScalar(multipleMaxWatermark ? BigDecimal.valueOf(-2L) : 
BigDecimal.ONE);
-      metrics.add(metricTwo);
-    }
+    metric.setScalar(BigDecimal.ONE);
+    return Lists.newArrayList(metric);
+  }
 
+  private LowLevelHttpResponse generateMockStreamingMetricResponse(Map<String,
+      BigDecimal> metricMap) throws IOException {
     MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
     response.setContentType(Json.MEDIA_TYPE);
+    JobMetrics jobMetrics = 
buildJobMetrics(generateMockStreamingMetrics(metricMap));
+    response.setContent(jobMetrics.toPrettyString());
+    return response;
+  }
+
+  private List<MetricUpdate> generateMockStreamingMetrics(Map<String, 
BigDecimal> metricMap) {
+    List<MetricUpdate> metrics = Lists.newArrayList();
+    for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
+      MetricStructuredName name = new MetricStructuredName();
+      name.setName(entry.getKey());
+
+      MetricUpdate metric = new MetricUpdate();
+      metric.setName(name);
+      metric.setScalar(entry.getValue());
+      metrics.add(metric);
+    }
+    return metrics;
+  }
+
+  private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
     JobMetrics jobMetrics = new JobMetrics();
-    jobMetrics.setMetrics(metrics);
+    jobMetrics.setMetrics(metricList);
     // N.B. Setting the factory is necessary in order to get valid JSON.
     jobMetrics.setFactory(Transport.getJsonFactory());
-    response.setContent(jobMetrics.toPrettyString());
-    return response;
+    return jobMetrics;
   }
 
   @Test
@@ -336,10 +351,10 @@ public class TestDataflowRunnerTest {
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
     doReturn(State.DONE).when(job).getState();
-    assertEquals(Optional.of(true), runner.checkForSuccess(job));
+    JobMetrics metrics = buildJobMetrics(
+        generateMockMetrics(true /* success */, true /* tentative */));
+    assertEquals(Optional.of(true), runner.checkForPAssertSuccess(job, 
metrics));
   }
 
   @Test
@@ -351,10 +366,10 @@ public class TestDataflowRunnerTest {
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
     doReturn(State.DONE).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+    JobMetrics metrics = buildJobMetrics(
+        generateMockMetrics(false /* success */, true /* tentative */));
+    assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, 
metrics));
   }
 
   @Test
@@ -366,121 +381,97 @@ public class TestDataflowRunnerTest {
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, false /* tentative */));
     doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.absent(), runner.checkForSuccess(job));
+    JobMetrics metrics = buildJobMetrics(
+        generateMockMetrics(true /* success */, false /* tentative */));
+    assertEquals(Optional.absent(), runner.checkForPAssertSuccess(job, 
metrics));
   }
 
-  private LowLevelHttpResponse generateMockMetricResponse(boolean success, 
boolean tentative)
-      throws Exception {
-    MetricStructuredName name = new MetricStructuredName();
-    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
-    name.setContext(
-        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, 
String>of());
-
-    MetricUpdate metric = new MetricUpdate();
-    metric.setName(name);
-    metric.setScalar(BigDecimal.ONE);
+  @Test
+  public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
 
-    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
-    response.setContentType(Json.MEDIA_TYPE);
-    JobMetrics jobMetrics = new JobMetrics();
-    jobMetrics.setMetrics(Lists.newArrayList(metric));
-    // N.B. Setting the factory is necessary in order to get valid JSON.
-    jobMetrics.setFactory(Transport.getJsonFactory());
-    response.setContent(jobMetrics.toPrettyString());
-    return response;
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of("no-watermark", new BigDecimal(100))));
+    doReturn(State.RUNNING).when(job).getState();
+    assertFalse(runner.atMaxWatermark(job, metrics));
   }
 
   @Test
-  public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
+  public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws 
IOException {
     DataflowPipelineJob job =
         spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute())
-        .thenReturn(generateMockStreamingMetricResponse(
-            false /* hasWatermark */,
-            false /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     doReturn(State.RUNNING).when(job).getState();
-    assertFalse(runner.checkMaxWatermark(job));
+    assertTrue(runner.atMaxWatermark(job, metrics));
   }
 
   @Test
-  public void testCheckMaxWatermarkWithSingleMaxWatermark() throws IOException 
{
+  public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws 
IOException {
     DataflowPipelineJob job =
         spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute())
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics
+        (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
     doReturn(State.RUNNING).when(job).getState();
-    assertTrue(runner.checkMaxWatermark(job));
+    assertFalse(runner.atMaxWatermark(job, metrics));
   }
 
   @Test
-  public void testCheckMaxWatermarkWithSingleWatermarkNotMax() throws 
IOException {
+  public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws 
IOException {
     DataflowPipelineJob job =
         spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute())
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            false /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
+            "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     doReturn(State.RUNNING).when(job).getState();
-    assertFalse(runner.checkMaxWatermark(job));
+    assertTrue(runner.atMaxWatermark(job, metrics));
   }
 
   @Test
-  public void testCheckMaxWatermarkWithMultipleMaxWatermark() throws 
IOException {
+  public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws 
IOException {
     DataflowPipelineJob job =
         spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute())
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            true /* multipleWatermarks */,
-            true /* multipleMaxWatermark */));
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
+            "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
     doReturn(State.RUNNING).when(job).getState();
-    assertTrue(runner.checkMaxWatermark(job));
+    assertFalse(runner.atMaxWatermark(job, metrics));
   }
 
   @Test
-  public void testCheckMaxWatermarkWithMaxAndNotMaxWatermarkMixed() throws 
IOException {
+  public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws 
IOException {
     DataflowPipelineJob job =
         spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute())
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            true /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
+            "no-watermark", new BigDecimal(100))));
     doReturn(State.RUNNING).when(job).getState();
-    assertFalse(runner.checkMaxWatermark(job));
+    assertTrue(runner.atMaxWatermark(job, metrics));
   }
 
   @Test
@@ -492,10 +483,8 @@ public class TestDataflowRunnerTest {
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, false /* tentative */));
     doReturn(State.FAILED).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+    assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null 
/* metrics */));
   }
 
   @Test
@@ -526,8 +515,8 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     try {
       runner.run(p, mockRunner);
@@ -542,6 +531,35 @@ public class TestDataflowRunnerTest {
   }
 
   @Test
+  public void testGetJobMetricsThatSucceeds() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    JobMetrics metrics = runner.getJobMetrics(job);
+
+    assertEquals(1, metrics.getMetrics().size());
+    assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
+        metrics.getMetrics());
+  }
+
+  @Test
+  public void testGetJobMetricsThatFailsForException() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    when(request.execute()).thenThrow(new IOException());
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    assertNull(runner.getJobMetrics(job));
+  }
+
+  @Test
   public void testBatchOnCreateMatcher() throws Exception {
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
@@ -559,8 +577,8 @@ public class TestDataflowRunnerTest {
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     runner.run(p, mockRunner);
   }
 
@@ -587,12 +605,8 @@ public class TestDataflowRunnerTest {
         .thenReturn(State.DONE);
 
     when(request.execute())
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */))
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */,
+            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     runner.run(p, mockRunner);
   }
 
@@ -614,8 +628,8 @@ public class TestDataflowRunnerTest {
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     runner.run(p, mockRunner);
   }
 
@@ -642,12 +656,8 @@ public class TestDataflowRunnerTest {
         .thenReturn(State.DONE);
 
     when(request.execute())
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */))
-        .thenReturn(generateMockStreamingMetricResponse(
-            true /* hasWatermark */,
-            true /* maxWatermark */,
-            false /* multipleWatermarks */,
-            false /* multipleMaxWatermark */));
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */,
+            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     runner.run(p, mockRunner);
   }
 
@@ -669,8 +679,8 @@ public class TestDataflowRunnerTest {
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
+    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
+        true /* tentative */, null /* additionalMetrics */));
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -704,7 +714,8 @@ public class TestDataflowRunnerTest {
         .thenReturn(State.FAILED);
 
     when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
+        generateMockMetricResponse(false /* success */, true /* tentative */,
+            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java
index b3dd4a0..4922d83 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java
@@ -18,7 +18,18 @@
 package org.apache.beam.sdk.testing;
 
 /**
- * Category tag for validation tests which are expected running in streaming 
mode.
+ * Category tag used to mark tests which execute using the Dataflow runner
+ * in streaming mode. Example usage:
+ * <pre><code>
+ *    {@literal @}Test
+ *    {@literal @}Category(StreamingIT.class)
+ *     public void testStreamingPipeline() {
+ *       StreamingOptions options = ...;
+ *       options.setStreaming(true);
+ *       StreamingPipeline.main(...);
+ *     }
+ * </code></pre>
  */
+@Deprecated
 public interface StreamingIT {
 }


Reply via email to