Add the ability to query metrics on PipelineResult All runners currently implement this by throwing an UnsupportedOperationException.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51fee39b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51fee39b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51fee39b Branch: refs/heads/master Commit: 51fee39b7bc66d7f60ea2e0ce31e3cb516a89305 Parents: 8524ed9 Author: bchambers <bchamb...@google.com> Authored: Wed Oct 12 10:55:05 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Thu Oct 13 15:29:29 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/direct/DirectRunner.java | 7 +++++++ .../org/apache/beam/runners/flink/FlinkRunnerResult.java | 6 ++++++ .../beam/runners/dataflow/DataflowPipelineJob.java | 7 +++++++ .../runners/spark/translation/EvaluationContext.java | 6 ++++++ .../main/java/org/apache/beam/sdk/PipelineResult.java | 11 +++++++++++ 5 files changed, 37 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index a72f7ae..e13046d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; @@ -380,6 +381,12 @@ public class DirectRunner }; } + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException( + "The DirectRunner does not currently support metrics."); + } + /** * Blocks until the {@link Pipeline} execution represented by this * {@link DirectPipelineResult} is complete, returning the terminal state. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 90bb64d..6b15485 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; @@ -86,4 +87,9 @@ public class FlinkRunnerResult implements PipelineResult { public State waitUntilFinish(Duration duration) { throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish."); } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 269b824..bbcf11f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -42,6 +42,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; @@ -426,6 +427,12 @@ public class DataflowPipelineJob implements PipelineResult { } } + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException( + "The DataflowRunner does not currently support metrics."); + } + private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator) throws IOException { if (aggregatorTransforms.contains(aggregator)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 2397276..1944b6b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -263,6 +264,11 @@ public class EvaluationContext implements EvaluationResult { } @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The SparkRunner does not currently support metrics."); + } + + @Override public <T> Iterable<T> get(PCollection<T> pcollection) { @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index d9cdc16..d7774bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -18,6 +18,9 @@ package org.apache.beam.sdk; import java.io.IOException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; @@ -127,4 +130,12 @@ public interface PipelineResult { return hasReplacement; } } + + /** + * Return the object to access metrics from the pipeline. + * + * @throws UnsupportedOperationException if the runner doesn't support retrieving metrics. + */ + @Experimental(Kind.METRICS) + MetricResults metrics(); }