Repository: incubator-beam Updated Branches: refs/heads/master 9b5cf3247 -> c0efe568e
Validate display data on transform primitives Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6269b8fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6269b8fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6269b8fa Branch: refs/heads/master Commit: 6269b8fa06daa4608f5ce5b11054cca58747a229 Parents: 9b5cf32 Author: Scott Wegner <sweg...@google.com> Authored: Mon May 9 13:31:39 2016 -0700 Committer: Scott Wegner <sweg...@google.com> Committed: Mon May 16 09:41:21 2016 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 10 +++ .../runners/dataflow/io/DataflowAvroIOTest.java | 69 ++++++++++++++++++++ .../dataflow/io/DataflowBigQueryIOTest.java | 29 +++++++- .../dataflow/io/DataflowDatastoreIOTest.java | 63 ++++++++++++++++++ .../dataflow/io/DataflowPubsubIOTest.java | 62 ++++++++++++++++++ .../runners/dataflow/io/DataflowTextIOTest.java | 15 ++++- .../DataflowDisplayDataEvaluator.java | 3 +- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 11 ++++ .../java/org/apache/beam/sdk/io/PubsubIO.java | 5 ++ .../org/apache/beam/sdk/io/BigQueryIOTest.java | 6 +- .../beam/sdk/io/CompressedSourceTest.java | 3 +- .../sdk/options/ProxyInvocationHandlerTest.java | 6 +- .../sdk/transforms/ApproximateUniqueTest.java | 3 +- .../display/DisplayDataEvaluator.java | 28 ++++---- .../display/DisplayDataEvaluatorTest.java | 24 ++++++- .../transforms/display/DisplayDataMatchers.java | 8 +++ .../display/DisplayDataMatchersTest.java | 3 +- .../sdk/transforms/windowing/WindowTest.java | 4 +- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 9 +-- 19 files changed, 316 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index a1d48ce..f7f1d80 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -288,11 +288,21 @@ </dependency> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> </dependency> <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-datastore-protobuf</artifactId> + </dependency> + + <dependency> <groupId>com.google.oauth-client</groupId> <artifactId>google-oauth-client</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java new file mode 100644 index 0000000..614affb --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; + +import org.apache.avro.Schema; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Set; + +/** + * {@link DataflowPipelineRunner} specific tests for {@link AvroIO} transforms. + */ +@RunWith(JUnit4.class) +public class DataflowAvroIOTest { + @Test + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + AvroIO.Write.Bound<?> write = AvroIO.Write + .to("foo") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("AvroIO.Write should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("fileNamePattern"))); + } + + @Test + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("AvroIO.Read should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("filePattern"))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java index 619da04..2b13b9c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; @@ -40,6 +39,30 @@ import java.util.Set; */ public class DataflowBigQueryIOTest { @Test + public void testTableSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .from("project:dataset.tableId") + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem("table"))); + } + + @Test + public void testQuerySourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .fromQuery("foobar") + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the query in its primitive display data", + displayData, hasItem(hasDisplayItem("query"))); + } + + @Test public void testBatchSinkPrimitiveDisplayData() { DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); options.setStreaming(false); @@ -63,9 +86,9 @@ public class DataflowBigQueryIOTest { Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("BigQueryIO.Write should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem(hasKey("tableSpec")))); + displayData, hasItem(hasDisplayItem("tableSpec"))); assertThat("BigQueryIO.Write should include the table schema in its primitive display data", - displayData, hasItem(hasDisplayItem(hasKey("schema")))); + displayData, hasItem(hasDisplayItem("schema"))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java new file mode 100644 index 0000000..42a0b99 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.DatastoreIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + +import com.google.api.services.datastore.DatastoreV1; + +import org.junit.Test; + +import java.util.Set; + +/** + * Unit tests for Dataflow usage of {@link DatastoreIO} transforms. + */ +public class DataflowDatastoreIOTest { + @Test + public void testSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PTransform<PInput, ?> read = DatastoreIO.readFrom( + "myDataset", DatastoreV1.Query.newBuilder().build()); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("DatastoreIO read should include the dataset in its primitive display data", + displayData, hasItem(hasDisplayItem("dataset"))); + } + + @Test + public void testSinkPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PTransform<PCollection<DatastoreV1.Entity>, ?> write = DatastoreIO.writeTo("myDataset"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the dataset in its primitive display data", + displayData, hasItem(hasDisplayItem("dataset"))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java new file mode 100644 index 0000000..4874877 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Set; + +/** + * {@link DataflowPipelineRunner} specific tests for {@link PubsubIO} transforms. + */ +@RunWith(JUnit4.class) +public class DataflowPubsubIOTest { + @Test + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PubsubIO.Write.Bound<?> write = PubsubIO.Write + .topic("projects/project/topics/topic"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("PubsubIO.Write should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } + + @Test + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PubsubIO.Read.Bound<String> read = PubsubIO.Read.topic("projects/project/topics/topic"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("PubsubIO.Read should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java index 8ff7d0e..0d7c1cb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java @@ -128,7 +128,7 @@ public class DataflowTextIOTest { } @Test - public void testPrimitiveDisplayData() { + public void testPrimitiveWriteDisplayData() { DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); TextIO.Write.Bound<?> write = TextIO.Write.to("foobar"); @@ -137,4 +137,17 @@ public class DataflowTextIOTest { assertThat("TextIO.Write should include the file prefix in its primitive display data", displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } + + @Test + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + TextIO.Read.Bound<String> read = TextIO.Read + .from("foobar") + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("TextIO.Read should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java index db42c77..0b865c3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java @@ -43,6 +43,7 @@ public final class DataflowDisplayDataEvaluator { public static DataflowPipelineOptions getDefaultOptions() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); options.setProject("foobar"); options.setTempLocation("gs://bucket/tmpLocation"); options.setFilesToStage(Lists.<String>newArrayList()); @@ -66,6 +67,6 @@ public final class DataflowDisplayDataEvaluator { * the {@link DataflowPipelineRunner} with the specified {@code options}. */ public static DisplayDataEvaluator create(DataflowPipelineOptions options) { - return DisplayDataEvaluator.forRunner(DataflowPipelineRunner.class, options); + return DisplayDataEvaluator.create(options); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index a5ef39f..3f22648 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -802,6 +802,12 @@ public class BigQueryIO { protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { // Do nothing. } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("table", jsonTable)); + } } /** @@ -893,6 +899,11 @@ public class BigQueryIO { tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("query", query)); + } private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index fa867c2..6a14477 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -1030,6 +1030,11 @@ public class PubsubIO { checkState(n == output.size()); output.clear(); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Bound.this.populateDisplayData(builder); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index d00914c..6849018 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString; import static org.apache.beam.sdk.io.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -710,8 +710,8 @@ public class BigQueryIOTest implements Serializable { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem(hasKey("table"))); - assertThat(displayData, hasDisplayItem(hasKey("schema"))); + assertThat(displayData, hasDisplayItem("table")); + assertThat(displayData, hasDisplayItem("schema")); assertThat(displayData, hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString())); assertThat(displayData, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 76e547f..542e734 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; import static org.hamcrest.Matchers.containsString; @@ -353,7 +352,7 @@ public class CompressedSourceTest { DisplayData compressedSourceDisplayData = DisplayData.from(compressedSource); DisplayData gzipDisplayData = DisplayData.from(gzipSource); - assertThat(compressedSourceDisplayData, hasDisplayItem(hasKey("compressionMode"))); + assertThat(compressedSourceDisplayData, hasDisplayItem("compressionMode")); assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString())); assertThat(compressedSourceDisplayData, hasDisplayItem("source", inputSource.getClass())); assertThat(compressedSourceDisplayData, includesDisplayDataFrom(inputSource)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 228a6ba..6fc9700 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -834,7 +834,7 @@ public class ProxyInvocationHandlerTest { PipelineOptions options = PipelineOptionsFactory.as(HasDefaults.class); DisplayData data = DisplayData.from(options); - assertThat(data, not(hasDisplayItem(hasKey("foo")))); + assertThat(data, not(hasDisplayItem("foo"))); } interface HasDefaults extends PipelineOptions { @@ -849,7 +849,7 @@ public class ProxyInvocationHandlerTest { assertEquals("bar", options.getFoo()); DisplayData data = DisplayData.from(options); - assertThat(data, not(hasDisplayItem(hasKey("foo")))); + assertThat(data, not(hasDisplayItem("foo"))); } @Test @@ -958,6 +958,6 @@ public class ProxyInvocationHandlerTest { options.setValue("foobar"); DisplayData data = DisplayData.from(options); - assertThat(data, not(hasDisplayItem(hasKey("value")))); + assertThat(data, not(hasDisplayItem("value"))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index c94c9f1..4f00ed4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -305,6 +304,6 @@ public class ApproximateUniqueTest implements Serializable { DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError); assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234)); assertThat("calculated sampleSize should be included", maxErrorDisplayData, - hasDisplayItem(hasKey("sampleSize"))); + hasDisplayItem("sampleSize")); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java index 6759003..a17e06f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -20,9 +20,9 @@ package org.apache.beam.sdk.transforms.display; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -40,25 +40,21 @@ public class DisplayDataEvaluator { private final PipelineOptions options; /** - * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineRunner} and - * default {@link PipelineOptions}. + * Create a new {@link DisplayDataEvaluator} using {@link TestPipeline#testingPipelineOptions()}. */ - public static DisplayDataEvaluator forRunner(Class<? extends PipelineRunner<?>> pipelineRunner) { - return forRunner(pipelineRunner, PipelineOptionsFactory.create()); + public static DisplayDataEvaluator create() { + return create(TestPipeline.testingPipelineOptions()); } /** - * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineRunner} and - * {@link PipelineOptions}. + * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineOptions}. */ - public static DisplayDataEvaluator forRunner( - Class<? extends PipelineRunner<?>> pipelineRunner, PipelineOptions pipelineOptions) { - return new DisplayDataEvaluator(pipelineRunner, pipelineOptions); + public static DisplayDataEvaluator create(PipelineOptions pipelineOptions) { + return new DisplayDataEvaluator(pipelineOptions); } - private DisplayDataEvaluator(Class<? extends PipelineRunner<?>> runner, PipelineOptions options) { + private DisplayDataEvaluator(PipelineOptions options) { this.options = options; - this.options.setRunner(runner); } /** @@ -109,7 +105,7 @@ public class DisplayDataEvaluator { extends Pipeline.PipelineVisitor.Defaults { private final PTransform root; private final Set<DisplayData> displayData; - private boolean shouldRecord = false; + private boolean inCompositeRoot = false; PrimitiveDisplayDataPTransformVisitor(PTransform root) { this.root = root; @@ -123,7 +119,7 @@ public class DisplayDataEvaluator { @Override public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { if (Objects.equals(root, node.getTransform())) { - shouldRecord = true; + inCompositeRoot = true; } return CompositeBehavior.ENTER_TRANSFORM; @@ -132,13 +128,13 @@ public class DisplayDataEvaluator { @Override public void leaveCompositeTransform(TransformTreeNode node) { if (Objects.equals(root, node.getTransform())) { - shouldRecord = false; + inCompositeRoot = false; } } @Override public void visitPrimitiveTransform(TransformTreeNode node) { - if (shouldRecord) { + if (inCompositeRoot || Objects.equals(root, node.getTransform())) { displayData.add(DisplayData.from(node.getTransform())); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java index f24c133..7b1dc79 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java @@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -41,7 +40,7 @@ import java.util.Set; public class DisplayDataEvaluatorTest implements Serializable { @Test - public void testDisplayDataForPrimitiveTransforms() { + public void testCompositeTransform() { PTransform<? super PCollection<String>, ? super POutput> myTransform = new PTransform<PCollection<String>, POutput> () { @Override @@ -65,10 +64,29 @@ public class DisplayDataEvaluatorTest implements Serializable { } }; - DisplayDataEvaluator evaluator = DisplayDataEvaluator.forRunner(DirectPipelineRunner.class); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(myTransform); assertThat(displayData, not(hasItem(hasDisplayItem("compositeKey", "compositeValue")))); assertThat(displayData, hasItem(hasDisplayItem("primitiveKey", "primitiveValue"))); } + + @Test + public void testPrimitiveTransform() { + PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of( + new DoFn<Integer, Integer>() { + @Override + public void processElement(ProcessContext c) throws Exception {} + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }); + + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(myTransform); + + assertThat(displayData, hasItem(hasDisplayItem("foo"))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index e3721b8..4207624 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -57,6 +57,14 @@ public class DisplayDataMatchers { } /** + * Creates a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key. + */ + public static Matcher<DisplayData> hasDisplayItem(String key) { + return hasDisplayItem(hasKey(key)); + } + + /** * Create a matcher that matches if the examined {@link DisplayData} contains an item with the * specified key and String value. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java index f848c5e..f9f2911 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms.display; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -67,7 +66,7 @@ public class DisplayDataMatchersTest { @Test public void testHasKey() { - Matcher<DisplayData> matcher = hasDisplayItem(hasKey("foo")); + Matcher<DisplayData> matcher = hasDisplayItem("foo"); assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar"))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index cd5eb2d..c858f32 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -292,7 +292,7 @@ public class WindowTest implements Serializable { .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); DisplayData data = DisplayData.from(window); - assertThat(data, not(hasDisplayItem(hasKey("trigger")))); - assertThat(data, not(hasDisplayItem(hasKey("allowedLateness")))); + assertThat(data, not(hasDisplayItem("trigger"))); + assertThat(data, not(hasDisplayItem("allowedLateness"))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 403ad9d..357ab44 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -20,10 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigtable; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; -import static org.apache.beam.sdk.testing.SourceTestUtils - .assertSplitAtFractionSucceedsAndConsistent; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; @@ -428,7 +426,7 @@ public class BigtableIOTest { assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString())); // BigtableIO adds user-agent to options; assert only on key and not value. - assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions"))); + assertThat(displayData, hasDisplayItem("bigtableOptions")); } /** Tests that a record gets written to the service and messages are logged. */ @@ -494,10 +492,7 @@ public class BigtableIOTest { .withBigtableOptions(BIGTABLE_OPTIONS); DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("tableId", "fooTable")); - // BigtableIO adds user-agent to options; assert only on key and not value. - assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions"))); } ////////////////////////////////////////////////////////////////////////////////////////////