Add the ability to query metrics on PipelineResult

All runners currently implement this by throwing an
UnsupportedOperationException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51fee39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51fee39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51fee39b

Branch: refs/heads/master
Commit: 51fee39b7bc66d7f60ea2e0ce31e3cb516a89305
Parents: 8524ed9
Author: bchambers <bchamb...@google.com>
Authored: Wed Oct 12 10:55:05 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/direct/DirectRunner.java     |  7 +++++++
 .../org/apache/beam/runners/flink/FlinkRunnerResult.java |  6 ++++++
 .../beam/runners/dataflow/DataflowPipelineJob.java       |  7 +++++++
 .../runners/spark/translation/EvaluationContext.java     |  6 ++++++
 .../main/java/org/apache/beam/sdk/PipelineResult.java    | 11 +++++++++++
 5 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index a72f7ae..e13046d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
@@ -380,6 +381,12 @@ public class DirectRunner
       };
     }
 
+    @Override
+    public MetricResults metrics() {
+      throw new UnsupportedOperationException(
+          "The DirectRunner does not currently support metrics.");
+    }
+
     /**
      * Blocks until the {@link Pipeline} execution represented by this
      * {@link DirectPipelineResult} is complete, returning the terminal state.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 90bb64d..6b15485 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
@@ -86,4 +87,9 @@ public class FlinkRunnerResult implements PipelineResult {
   public State waitUntilFinish(Duration duration) {
     throw new UnsupportedOperationException("FlinkRunnerResult does not 
support waitUntilFinish.");
   }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The FlinkRunner does not 
currently support metrics.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 269b824..bbcf11f 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
@@ -426,6 +427,12 @@ public class DataflowPipelineJob implements PipelineResult 
{
     }
   }
 
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException(
+        "The DataflowRunner does not currently support metrics.");
+  }
+
   private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, 
OutputT> aggregator)
       throws IOException {
     if (aggregatorTransforms.contains(aggregator)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 2397276..1944b6b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -263,6 +264,11 @@ public class EvaluationContext implements EvaluationResult 
{
   }
 
   @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The SparkRunner does not 
currently support metrics.");
+  }
+
+  @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
     RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index d9cdc16..d7774bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -18,6 +18,9 @@
 package org.apache.beam.sdk;
 
 import java.io.IOException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
@@ -127,4 +130,12 @@ public interface PipelineResult {
       return hasReplacement;
     }
   }
+
+  /**
+   * Return the object to access metrics from the pipeline.
+   *
+   * @throws UnsupportedOperationException if the runner doesn't support 
retrieving metrics.
+   */
+  @Experimental(Kind.METRICS)
+  MetricResults metrics();
 }

Reply via email to