Repository: incubator-beam Updated Branches: refs/heads/master 66318d829 -> a972b2330
Add RunnableOnService test for Metrics Add UsesMetrics interface and exclude from runners that don't yet support Metrics Add Serializability as needed for Metrics to be created during pipeline construction Remove test from DirectRunnerTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/998cabc8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/998cabc8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/998cabc8 Branch: refs/heads/master Commit: 998cabc8bbbf8d08d7bfad71e9376707388f5c5c Parents: 66318d8 Author: bchambers <bchamb...@google.com> Authored: Thu Dec 15 17:04:59 2016 -0800 Committer: bchambers <bchamb...@google.com> Committed: Mon Dec 19 11:29:39 2016 -0800 ---------------------------------------------------------------------- runners/apex/pom.xml | 3 +- .../beam/runners/direct/DirectRunnerTest.java | 39 ------------ runners/flink/runner/pom.xml | 6 +- runners/google-cloud-dataflow-java/pom.xml | 3 +- runners/spark/pom.xml | 3 +- .../org/apache/beam/sdk/metrics/MetricName.java | 3 +- .../org/apache/beam/sdk/metrics/Metrics.java | 5 +- .../apache/beam/sdk/testing/UsesMetrics.java | 24 ++++++++ .../apache/beam/sdk/metrics/MetricMatchers.java | 4 +- .../apache/beam/sdk/metrics/MetricsTest.java | 63 +++++++++++++++++++- 10 files changed, 103 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index f71637c..d03964d 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -186,7 +186,8 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index eb0f344..eafb788 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; @@ -37,7 +35,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -48,13 +45,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Distribution; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; @@ -467,35 +457,6 @@ public class DirectRunnerTest implements Serializable { } } - @Test - public void testMetrics() throws Exception { - Pipeline pipeline = getPipeline(); - pipeline - .apply(Create.of(5, 8, 13)) - .apply("MyStep", ParDo.of(new DoFn<Integer, Void>() { - @ProcessElement - public void processElement(ProcessContext c) { - Counter count = Metrics.counter(DirectRunnerTest.class, "count"); - Distribution values = Metrics.distribution(DirectRunnerTest.class, "input"); - - count.inc(); - values.update(c.element()); - } - })); - PipelineResult result = pipeline.run(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class)) - .build()); - - final String stepName = "MyStep/AnonymousParDo/AnonymousParMultiDo"; - assertThat(metrics.counters(), contains( - metricResult(DirectRunnerTest.class.getName(), "count", stepName, 3L, 3L))); - assertThat(metrics.distributions(), contains( - metricResult(DirectRunnerTest.class.getName(), "input", stepName, - DistributionResult.create(26L, 3L, 5L, 13L), - DistributionResult.create(26L, 3L, 5L, 13L)))); - } - private static class MustSplitSource<T> extends BoundedSource<T>{ public static <T> BoundedSource<T> of(BoundedSource<T> underlying) { return new MustSplitSource<>(underlying); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 09773e1..7f49372 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -56,7 +56,8 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> @@ -86,7 +87,8 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 46ac7ef..0094791 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -80,7 +80,8 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics </excludedGroups> <excludes> <exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 5a2fe87..309e1ff 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -75,7 +75,8 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics </excludedGroups> <forkCount>1</forkCount> <reuseForks>false</reuseForks> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java index 843a885..3c77043 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -28,7 +29,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; */ @Experimental(Kind.METRICS) @AutoValue -public abstract class MetricName { +public abstract class MetricName implements Serializable { /** The namespace associated with this metric. */ public abstract String namespace(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index b72a0b2..045e076 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.metrics; +import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -58,7 +59,7 @@ public class Metrics { } /** Implementation of {@link Counter} that delegates to the instance for the current context. */ - private static class DelegatingCounter implements Counter { + private static class DelegatingCounter implements Counter, Serializable { private final MetricName name; private DelegatingCounter(MetricName name) { @@ -92,7 +93,7 @@ public class Metrics { /** * Implementation of {@link Distribution} that delegates to the instance for the current context. */ - private static class DelegatingDistribution implements Distribution { + private static class DelegatingDistribution implements Distribution, Serializable { private final MetricName name; private DelegatingDistribution(MetricName name) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java new file mode 100644 index 0000000..261354c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Metrics}. + */ +public interface UsesMetrics {} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java index 6cd4c52..798d9d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -78,7 +78,7 @@ public class MetricMatchers { protected boolean matchesSafely(MetricResult<T> item) { return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) - && Objects.equals(step, item.step()) + && item.step().contains(step) && Objects.equals(committed, item.committed()) && Objects.equals(attempted, item.attempted()); } @@ -109,7 +109,7 @@ public class MetricMatchers { .appendText(" != ").appendValue(item.name().name()); } - if (!Objects.equals(step, item.step())) { + if (!item.step().contains(step)) { mismatchDescription .appendText("step: ").appendValue(step) .appendText(" != ").appendValue(item.step()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/998cabc8/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 732cb34..075df19 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -18,18 +18,30 @@ package org.apache.beam.sdk.metrics; +import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesMetrics; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Tests for {@link Metrics}. */ -public class MetricsTest { +public class MetricsTest implements Serializable { private static final String NS = "test"; private static final String NAME = "name"; @@ -95,4 +107,53 @@ public class MetricsTest { counter.dec(); assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); } + + @Category({RunnableOnService.class, UsesMetrics.class}) + @Test + public void metricsReportToQuery() { + final Counter count = Metrics.counter(MetricsTest.class, "count"); + Pipeline pipeline = TestPipeline.create(); + pipeline + .apply(Create.of(5, 8, 13)) + .apply("MyStep1", ParDo.of(new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { + Distribution values = Metrics.distribution(MetricsTest.class, "input"); + count.inc(); + values.update(c.element()); + + c.output(c.element()); + c.output(c.element()); + } + })) + .apply("MyStep2", ParDo.of(new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { + Distribution values = Metrics.distribution(MetricsTest.class, "input"); + count.inc(); + values.update(c.element()); + } + })); + PipelineResult result = pipeline.run(); + + result.waitUntilFinish(); + + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. + assertThat(metrics.counters(), hasItem( + metricResult(MetricsTest.class.getName(), "count", "MyStep1", 3L, 3L))); + assertThat(metrics.distributions(), hasItem( + metricResult(MetricsTest.class.getName(), "input", "MyStep1", + DistributionResult.create(26L, 3L, 5L, 13L), + DistributionResult.create(26L, 3L, 5L, 13L)))); + + assertThat(metrics.counters(), hasItem( + metricResult(MetricsTest.class.getName(), "count", "MyStep2", 6L, 6L))); + assertThat(metrics.distributions(), hasItem( + metricResult(MetricsTest.class.getName(), "input", "MyStep2", + DistributionResult.create(52L, 6L, 5L, 13L), + DistributionResult.create(52L, 6L, 5L, 13L)))); + } }