Repository: incubator-beam
Updated Branches:
  refs/heads/master 9b5cf3247 -> c0efe568e


Validate display data on transform primitives


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6269b8fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6269b8fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6269b8fa

Branch: refs/heads/master
Commit: 6269b8fa06daa4608f5ce5b11054cca58747a229
Parents: 9b5cf32
Author: Scott Wegner <sweg...@google.com>
Authored: Mon May 9 13:31:39 2016 -0700
Committer: Scott Wegner <sweg...@google.com>
Committed: Mon May 16 09:41:21 2016 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      | 10 +++
 .../runners/dataflow/io/DataflowAvroIOTest.java | 69 ++++++++++++++++++++
 .../dataflow/io/DataflowBigQueryIOTest.java     | 29 +++++++-
 .../dataflow/io/DataflowDatastoreIOTest.java    | 63 ++++++++++++++++++
 .../dataflow/io/DataflowPubsubIOTest.java       | 62 ++++++++++++++++++
 .../runners/dataflow/io/DataflowTextIOTest.java | 15 ++++-
 .../DataflowDisplayDataEvaluator.java           |  3 +-
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 11 ++++
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  5 ++
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  6 +-
 .../beam/sdk/io/CompressedSourceTest.java       |  3 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |  6 +-
 .../sdk/transforms/ApproximateUniqueTest.java   |  3 +-
 .../display/DisplayDataEvaluator.java           | 28 ++++----
 .../display/DisplayDataEvaluatorTest.java       | 24 ++++++-
 .../transforms/display/DisplayDataMatchers.java |  8 +++
 .../display/DisplayDataMatchersTest.java        |  3 +-
 .../sdk/transforms/windowing/WindowTest.java    |  4 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  9 +--
 19 files changed, 316 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index a1d48ce..f7f1d80 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -288,11 +288,21 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.api-client</groupId>
       <artifactId>google-api-client</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-datastore-protobuf</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.oauth-client</groupId>
       <artifactId>google-oauth-client</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
new file mode 100644
index 0000000..614affb
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.io;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import 
org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+
+import org.apache.avro.Schema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Set;
+
+/**
+ * {@link DataflowPipelineRunner} specific tests for {@link AvroIO} transforms.
+ */
+@RunWith(JUnit4.class)
+public class DataflowAvroIOTest {
+  @Test
+  public void testPrimitiveWriteDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+
+    AvroIO.Write.Bound<?> write = AvroIO.Write
+        .to("foo")
+        .withSchema(Schema.create(Schema.Type.STRING))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("AvroIO.Write should include the file pattern in its primitive 
transform",
+        displayData, hasItem(hasDisplayItem("fileNamePattern")));
+  }
+
+  @Test
+  public void testPrimitiveReadDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+
+    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+        .withSchema(Schema.create(Schema.Type.STRING))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("AvroIO.Read should include the file pattern in its primitive 
transform",
+        displayData, hasItem(hasDisplayItem("filePattern")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
index 619da04..2b13b9c 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.dataflow.io;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
@@ -40,6 +39,30 @@ import java.util.Set;
  */
 public class DataflowBigQueryIOTest {
   @Test
+  public void testTableSourcePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .from("project:dataset.tableId")
+        .withoutValidation();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("BigQueryIO.Read should include the table spec in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("table")));
+  }
+
+  @Test
+  public void testQuerySourcePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .fromQuery("foobar")
+        .withoutValidation();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("BigQueryIO.Read should include the query in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("query")));
+  }
+
+  @Test
   public void testBatchSinkPrimitiveDisplayData() {
     DataflowPipelineOptions options = 
DataflowDisplayDataEvaluator.getDefaultOptions();
     options.setStreaming(false);
@@ -63,9 +86,9 @@ public class DataflowBigQueryIOTest {
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("BigQueryIO.Write should include the table spec in its 
primitive display data",
-        displayData, hasItem(hasDisplayItem(hasKey("tableSpec"))));
+        displayData, hasItem(hasDisplayItem("tableSpec")));
 
     assertThat("BigQueryIO.Write should include the table schema in its 
primitive display data",
-        displayData, hasItem(hasDisplayItem(hasKey("schema"))));
+        displayData, hasItem(hasDisplayItem("schema")));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
new file mode 100644
index 0000000..42a0b99
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.io;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import 
org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
+import org.apache.beam.sdk.io.DatastoreIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+
+import com.google.api.services.datastore.DatastoreV1;
+
+import org.junit.Test;
+
+import java.util.Set;
+
+/**
+ * Unit tests for Dataflow usage of {@link DatastoreIO} transforms.
+ */
+public class DataflowDatastoreIOTest {
+  @Test
+  public void testSourcePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+    PTransform<PInput, ?> read = DatastoreIO.readFrom(
+        "myDataset", DatastoreV1.Query.newBuilder().build());
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("DatastoreIO read should include the dataset in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("dataset")));
+  }
+
+  @Test
+  public void testSinkPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+    PTransform<PCollection<DatastoreV1.Entity>, ?> write = 
DatastoreIO.writeTo("myDataset");
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the dataset in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("dataset")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
new file mode 100644
index 0000000..4874877
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.io;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import 
org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
+import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Set;
+
+/**
+ * {@link DataflowPipelineRunner} specific tests for {@link PubsubIO} 
transforms.
+ */
+@RunWith(JUnit4.class)
+public class DataflowPubsubIOTest {
+  @Test
+  public void testPrimitiveWriteDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+    PubsubIO.Write.Bound<?> write = PubsubIO.Write
+        .topic("projects/project/topics/topic");
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("PubsubIO.Write should include the topic in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("topic")));
+  }
+
+  @Test
+  public void testPrimitiveReadDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+    PubsubIO.Read.Bound<String> read = 
PubsubIO.Read.topic("projects/project/topics/topic");
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("PubsubIO.Read should include the topic in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("topic")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
index 8ff7d0e..0d7c1cb 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -128,7 +128,7 @@ public class DataflowTextIOTest {
   }
 
   @Test
-  public void testPrimitiveDisplayData() {
+  public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
 
     TextIO.Write.Bound<?> write = TextIO.Write.to("foobar");
@@ -137,4 +137,17 @@ public class DataflowTextIOTest {
     assertThat("TextIO.Write should include the file prefix in its primitive 
display data",
         displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
   }
+
+  @Test
+  public void testPrimitiveReadDisplayData() {
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+
+    TextIO.Read.Bound<String> read = TextIO.Read
+        .from("foobar")
+        .withoutValidation();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("TextIO.Read should include the file prefix in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
index db42c77..0b865c3 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
@@ -43,6 +43,7 @@ public final class DataflowDisplayDataEvaluator {
   public static DataflowPipelineOptions getDefaultOptions() {
     DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 
+    options.setRunner(DataflowPipelineRunner.class);
     options.setProject("foobar");
     options.setTempLocation("gs://bucket/tmpLocation");
     options.setFilesToStage(Lists.<String>newArrayList());
@@ -66,6 +67,6 @@ public final class DataflowDisplayDataEvaluator {
    * the {@link DataflowPipelineRunner} with the specified {@code options}.
    */
   public static DisplayDataEvaluator create(DataflowPipelineOptions options) {
-    return DisplayDataEvaluator.forRunner(DataflowPipelineRunner.class, 
options);
+    return DisplayDataEvaluator.create(options);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index a5ef39f..3f22648 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -802,6 +802,12 @@ public class BigQueryIO {
     protected void cleanupTempResource(BigQueryOptions bqOptions) throws 
Exception {
       // Do nothing.
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("table", jsonTable));
+    }
   }
 
   /**
@@ -893,6 +899,11 @@ public class BigQueryIO {
       tableService.deleteDataset(tableToRemove.getProjectId(), 
tableToRemove.getDatasetId());
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("query", query));
+    }
     private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions 
bqOptions)
         throws InterruptedException, IOException {
       if (dryRunJobStats.get() == null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index fa867c2..6a14477 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -1030,6 +1030,11 @@ public class PubsubIO {
           checkState(n == output.size());
           output.clear();
         }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          Bound.this.populateDisplayData(builder);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index d00914c..6849018 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.io;
 import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString;
 import static org.apache.beam.sdk.io.BigQueryIO.toJsonString;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -710,8 +710,8 @@ public class BigQueryIOTest implements Serializable {
 
     DisplayData displayData = DisplayData.from(write);
 
-    assertThat(displayData, hasDisplayItem(hasKey("table")));
-    assertThat(displayData, hasDisplayItem(hasKey("schema")));
+    assertThat(displayData, hasDisplayItem("table"));
+    assertThat(displayData, hasDisplayItem("schema"));
     assertThat(displayData,
         hasDisplayItem("createDisposition", 
CreateDisposition.CREATE_IF_NEEDED.toString()));
     assertThat(displayData,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 76e547f..542e734 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
 
 import static org.hamcrest.Matchers.containsString;
@@ -353,7 +352,7 @@ public class CompressedSourceTest {
     DisplayData compressedSourceDisplayData = 
DisplayData.from(compressedSource);
     DisplayData gzipDisplayData = DisplayData.from(gzipSource);
 
-    assertThat(compressedSourceDisplayData, 
hasDisplayItem(hasKey("compressionMode")));
+    assertThat(compressedSourceDisplayData, hasDisplayItem("compressionMode"));
     assertThat(gzipDisplayData, hasDisplayItem("compressionMode", 
CompressionMode.GZIP.toString()));
     assertThat(compressedSourceDisplayData, hasDisplayItem("source", 
inputSource.getClass()));
     assertThat(compressedSourceDisplayData, 
includesDisplayDataFrom(inputSource));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 228a6ba..6fc9700 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -834,7 +834,7 @@ public class ProxyInvocationHandlerTest {
     PipelineOptions options = PipelineOptionsFactory.as(HasDefaults.class);
     DisplayData data = DisplayData.from(options);
 
-    assertThat(data, not(hasDisplayItem(hasKey("foo"))));
+    assertThat(data, not(hasDisplayItem("foo")));
   }
 
   interface HasDefaults extends PipelineOptions {
@@ -849,7 +849,7 @@ public class ProxyInvocationHandlerTest {
     assertEquals("bar", options.getFoo());
 
     DisplayData data = DisplayData.from(options);
-    assertThat(data, not(hasDisplayItem(hasKey("foo"))));
+    assertThat(data, not(hasDisplayItem("foo")));
   }
 
   @Test
@@ -958,6 +958,6 @@ public class ProxyInvocationHandlerTest {
     options.setValue("foobar");
 
     DisplayData data = DisplayData.from(options);
-    assertThat(data, not(hasDisplayItem(hasKey("value"))));
+    assertThat(data, not(hasDisplayItem("value")));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index c94c9f1..4f00ed4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -305,6 +304,6 @@ public class ApproximateUniqueTest implements Serializable {
     DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError);
     assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 
0.1234));
     assertThat("calculated sampleSize should be included", maxErrorDisplayData,
-        hasDisplayItem(hasKey("sampleSize")));
+        hasDisplayItem("sampleSize"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
index 6759003..a17e06f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.transforms.display;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -40,25 +40,21 @@ public class DisplayDataEvaluator {
   private final PipelineOptions options;
 
   /**
-   * Create a new {@link DisplayDataEvaluator} using the specified {@link 
PipelineRunner} and
-   * default {@link PipelineOptions}.
+   * Create a new {@link DisplayDataEvaluator} using {@link 
TestPipeline#testingPipelineOptions()}.
    */
-  public static DisplayDataEvaluator forRunner(Class<? extends 
PipelineRunner<?>> pipelineRunner) {
-    return forRunner(pipelineRunner, PipelineOptionsFactory.create());
+  public static DisplayDataEvaluator create() {
+    return create(TestPipeline.testingPipelineOptions());
   }
 
   /**
-   * Create a new {@link DisplayDataEvaluator} using the specified {@link 
PipelineRunner} and
-   * {@link PipelineOptions}.
+   * Create a new {@link DisplayDataEvaluator} using the specified {@link 
PipelineOptions}.
    */
-  public static DisplayDataEvaluator forRunner(
-      Class<? extends PipelineRunner<?>> pipelineRunner, PipelineOptions 
pipelineOptions) {
-    return new DisplayDataEvaluator(pipelineRunner, pipelineOptions);
+  public static DisplayDataEvaluator create(PipelineOptions pipelineOptions) {
+    return new DisplayDataEvaluator(pipelineOptions);
   }
 
-  private DisplayDataEvaluator(Class<? extends PipelineRunner<?>> runner, 
PipelineOptions options) {
+  private DisplayDataEvaluator(PipelineOptions options) {
     this.options = options;
-    this.options.setRunner(runner);
   }
 
   /**
@@ -109,7 +105,7 @@ public class DisplayDataEvaluator {
   extends Pipeline.PipelineVisitor.Defaults {
     private final PTransform root;
     private final Set<DisplayData> displayData;
-    private boolean shouldRecord = false;
+    private boolean inCompositeRoot = false;
 
     PrimitiveDisplayDataPTransformVisitor(PTransform root) {
       this.root = root;
@@ -123,7 +119,7 @@ public class DisplayDataEvaluator {
     @Override
     public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
       if (Objects.equals(root, node.getTransform())) {
-        shouldRecord = true;
+        inCompositeRoot = true;
       }
 
       return CompositeBehavior.ENTER_TRANSFORM;
@@ -132,13 +128,13 @@ public class DisplayDataEvaluator {
     @Override
     public void leaveCompositeTransform(TransformTreeNode node) {
       if (Objects.equals(root, node.getTransform())) {
-        shouldRecord = false;
+        inCompositeRoot = false;
       }
     }
 
     @Override
     public void visitPrimitiveTransform(TransformTreeNode node) {
-      if (shouldRecord) {
+      if (inCompositeRoot || Objects.equals(root, node.getTransform())) {
         displayData.add(DisplayData.from(node.getTransform()));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index f24c133..7b1dc79 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -41,7 +40,7 @@ import java.util.Set;
 public class DisplayDataEvaluatorTest implements Serializable {
 
   @Test
-  public void testDisplayDataForPrimitiveTransforms() {
+  public void testCompositeTransform() {
     PTransform<? super PCollection<String>, ? super POutput> myTransform =
         new PTransform<PCollection<String>, POutput> () {
           @Override
@@ -65,10 +64,29 @@ public class DisplayDataEvaluatorTest implements 
Serializable {
           }
         };
 
-    DisplayDataEvaluator evaluator = 
DisplayDataEvaluator.forRunner(DirectPipelineRunner.class);
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(myTransform);
 
     assertThat(displayData, not(hasItem(hasDisplayItem("compositeKey", 
"compositeValue"))));
     assertThat(displayData, hasItem(hasDisplayItem("primitiveKey", 
"primitiveValue")));
   }
+
+  @Test
+  public void testPrimitiveTransform() {
+    PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> 
myTransform = ParDo.of(
+        new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {}
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    });
+
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(myTransform);
+
+    assertThat(displayData, hasItem(hasDisplayItem("foo")));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
index e3721b8..4207624 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
@@ -57,6 +57,14 @@ public class DisplayDataMatchers {
   }
 
   /**
+   * Creates a matcher that matches if the examined {@link DisplayData} 
contains an item with the
+   * specified key.
+   */
+  public static Matcher<DisplayData> hasDisplayItem(String key) {
+    return hasDisplayItem(hasKey(key));
+  }
+
+  /**
    * Create a matcher that matches if the examined {@link DisplayData} 
contains an item with the
    * specified key and String value.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index f848c5e..f9f2911 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms.display;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
@@ -67,7 +66,7 @@ public class DisplayDataMatchersTest {
 
   @Test
   public void testHasKey() {
-    Matcher<DisplayData> matcher = hasDisplayItem(hasKey("foo"));
+    Matcher<DisplayData> matcher = hasDisplayItem("foo");
 
     assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar")));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index cd5eb2d..c858f32 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -292,7 +292,7 @@ public class WindowTest implements Serializable {
         
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
     DisplayData data = DisplayData.from(window);
-    assertThat(data, not(hasDisplayItem(hasKey("trigger"))));
-    assertThat(data, not(hasDisplayItem(hasKey("allowedLateness"))));
+    assertThat(data, not(hasDisplayItem("trigger")));
+    assertThat(data, not(hasDisplayItem("allowedLateness")));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6269b8fa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 403ad9d..357ab44 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -20,10 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigtable;
 import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
 import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
 import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
-import static org.apache.beam.sdk.testing.SourceTestUtils
-    .assertSplitAtFractionSucceedsAndConsistent;
+import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Verify.verifyNotNull;
@@ -428,7 +426,7 @@ public class BigtableIOTest {
     assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
 
     // BigtableIO adds user-agent to options; assert only on key and not value.
-    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+    assertThat(displayData, hasDisplayItem("bigtableOptions"));
   }
 
   /** Tests that a record gets written to the service and messages are logged. 
*/
@@ -494,10 +492,7 @@ public class BigtableIOTest {
         .withBigtableOptions(BIGTABLE_OPTIONS);
 
     DisplayData displayData = DisplayData.from(write);
-
     assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
-    // BigtableIO adds user-agent to options; assert only on key and not value.
-    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
   }
 
   
////////////////////////////////////////////////////////////////////////////////////////////


Reply via email to