Repository: incubator-beam
Updated Branches:
  refs/heads/master 4755c5a78 -> bb086b8d3


Fix bug in PipelineOptions DisplayData serialization

PipelineOptions has been improved to generate display data
to be consumed by a runner and used for display. However, there
was a bug in the ProxyInvocationHandler implementation of
PipelineOptions display data which was causing NullPointerExceptions
when generated display data from PipelineOptions previously
deserialized from JSON.

This change also makes our error handling for display data exceptions
consistent across the Dataflow runner: exceptions thrown during
display data population will propogate out and cause the pipeline to
fail. This is consistent with other user code which may throw
exceptions at pipeline construction time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e669c44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e669c44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e669c44

Branch: refs/heads/master
Commit: 1e669c44c9d2448b55f5bdba3dcff1831b2cd8b4
Parents: 4755c5a
Author: Scott Wegner <sweg...@google.com>
Authored: Thu May 19 09:17:37 2016 -0700
Committer: Scott Wegner <sweg...@google.com>
Committed: Fri May 20 17:20:29 2016 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 50 +----------
 .../DataflowPipelineTranslatorTest.java         | 63 --------------
 .../sdk/options/ProxyInvocationHandler.java     |  4 +-
 .../sdk/transforms/display/DisplayData.java     | 14 +--
 .../sdk/options/ProxyInvocationHandlerTest.java |  6 +-
 .../sdk/transforms/display/DisplayDataTest.java | 91 ++++++++++++++------
 6 files changed, 83 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 7f67393..f5fefc0 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -29,7 +29,6 @@ import static org.apache.beam.sdk.util.Structs.addString;
 import static org.apache.beam.sdk.util.Structs.getString;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import 
org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.internal.ReadTranslator;
@@ -87,8 +86,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -726,18 +723,7 @@ public class DataflowPipelineTranslator {
     }
 
     private void addDisplayData(String stepName, HasDisplayData 
hasDisplayData) {
-      DisplayData displayData;
-      try {
-        displayData = DisplayData.from(hasDisplayData);
-      } catch (Exception e) {
-        String msg = String.format("Exception thrown while collecting display 
data for step: %s. "
-            + "Display data will be not be available for this step.", 
stepName);
-        DisplayDataException displayDataException = new 
DisplayDataException(msg, e);
-        LOG.warn(msg, displayDataException);
-
-        displayData = displayDataException.asDisplayData();
-      }
-
+      DisplayData displayData = DisplayData.from(hasDisplayData);
       List<Map<String, Object>> list = MAPPER.convertValue(displayData, 
List.class);
       addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
     }
@@ -1056,38 +1042,4 @@ public class DataflowPipelineTranslator {
       context.addOutput(tag.getId(), output);
     }
   }
-
-  /**
-   * Wraps exceptions thrown while collecting {@link DisplayData} for the 
Dataflow pipeline runner.
-   */
-  static class DisplayDataException extends Exception implements 
HasDisplayData {
-    public DisplayDataException(String message, Throwable cause) {
-      super(checkNotNull(message), checkNotNull(cause));
-    }
-
-    /**
-     * Retrieve a display data representation of the exception, which can be 
submitted to
-     * the service in place of the actual display data.
-     */
-    public DisplayData asDisplayData() {
-      return DisplayData.from(this);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      Throwable cause = getCause();
-      builder
-        .add(DisplayData.item("exceptionMessage", getMessage()))
-        .add(DisplayData.item("exceptionType", cause.getClass()))
-        .add(DisplayData.item("exceptionCause", cause.getMessage()))
-        .add(DisplayData.item("stackTrace", stackTraceToString()));
-    }
-
-    private String stackTraceToString() {
-      StringWriter stringWriter = new StringWriter();
-      PrintWriter printWriter = new PrintWriter(stringWriter);
-      printStackTrace(printWriter);
-      return stringWriter.toString();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 58c6f75..165d2b5 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -23,9 +23,7 @@ import static org.apache.beam.sdk.util.Structs.getString;
 
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -50,7 +48,6 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.RecordingPipelineVisitor;
-import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -80,7 +77,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
-import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -106,9 +102,7 @@ import java.util.Map;
  */
 @RunWith(JUnit4.class)
 public class DataflowPipelineTranslatorTest implements Serializable {
-
   @Rule public transient ExpectedException thrown = ExpectedException.none();
-  @Rule public transient ExpectedLogs logs = 
ExpectedLogs.none(DataflowPipelineTranslator.class);
 
   // A Custom Mockito matcher for an initial Job that checks that all
   // expected fields are set.
@@ -973,61 +967,4 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
     assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
   }
-
-  @Test
-  public void testCapturesDisplayDataExceptions() throws IOException {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
-    Pipeline pipeline = Pipeline.create(options);
-
-    final RuntimeException displayDataException = new 
RuntimeException("foobar");
-    pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(ParDo.of(new DoFn<Integer, Integer>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {
-            c.output(c.element());
-          }
-
-          @Override
-          public void populateDisplayData(DisplayData.Builder builder) {
-            throw displayDataException;
-          }
-        }));
-
-    Job job = translator.translate(
-        pipeline,
-        (DataflowPipelineRunner) pipeline.getRunner(),
-        Collections.<DataflowPackage>emptyList()).getJob();
-
-    String expectedMessage = "Display data will be not be available for this 
step";
-    logs.verifyWarn(expectedMessage);
-
-    List<Step> steps = job.getSteps();
-    assertEquals("Job should have 2 steps", 2, steps.size());
-
-    @SuppressWarnings("unchecked")
-    Iterable<Map<String, String>> displayData = (Collection<Map<String, 
String>>) steps.get(1)
-        .getProperties().get("display_data");
-
-    String namespace = 
DataflowPipelineTranslator.DisplayDataException.class.getName();
-    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
-      hasEntry("namespace", namespace),
-      hasEntry("key", "exceptionType"),
-      hasEntry("value", RuntimeException.class.getName()))));
-
-    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
-        hasEntry("namespace", namespace),
-        hasEntry("key", "exceptionMessage"),
-        hasEntry(is("value"), Matchers.containsString(expectedMessage)))));
-
-    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
-        hasEntry("namespace", namespace),
-        hasEntry("key", "exceptionCause"),
-        hasEntry("value", "foobar"))));
-
-    Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
-        hasEntry("namespace", namespace),
-        hasEntry("key", "stackTrace"))));
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 159eb5b..3292a7f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -315,6 +315,7 @@ class ProxyInvocationHandler implements InvocationHandler {
           }
 
           Object value = getValueFromJson(jsonOption.getKey(), 
spec.getGetterMethod());
+          value = value == null ? "" : value;
           DisplayData.Type type = DisplayData.inferType(value);
           if (type != null) {
             builder.add(DisplayData.item(jsonOption.getKey(), type, value)
@@ -552,7 +553,8 @@ class ProxyInvocationHandler implements InvocationHandler {
         jgen.writeObject(serializableOptions);
 
         List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
-        for (DisplayData.Item<?> item : DisplayData.from(value).items()) {
+        DisplayData displayData = DisplayData.from(value);
+        for (DisplayData.Item<?> item : displayData.items()) {
           @SuppressWarnings("unchecked")
           Map<String, Object> serializedItem = MAPPER.convertValue(item, 
Map.class);
           serializedDisplayData.add(serializedItem);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index dc6e381..9e9bdbf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -72,10 +72,6 @@ public class DisplayData implements Serializable {
    * Collect the {@link DisplayData} from a component. This will traverse all 
subcomponents
    * specified via {@link Builder#include} in the given component. Data in 
this component will be in
    * a namespace derived from the component.
-   *
-   * <p>Pipeline runners should call this method in order to collect display 
data. While it should
-   * be safe to call {@code DisplayData.from} on any component which 
implements it, runners should
-   * be resilient to exceptions thrown while collecting display data.
    */
   public static DisplayData from(HasDisplayData component) {
     checkNotNull(component, "component argument cannot be null");
@@ -603,7 +599,15 @@ public class DisplayData implements Serializable {
       if (newComponent) {
         String prevNs = this.latestNs;
         this.latestNs = namespace;
-        subComponent.populateDisplayData(this);
+
+        try {
+          subComponent.populateDisplayData(this);
+        } catch (Throwable e) {
+          String msg = String.format("Error while populating display data for 
component: %s",
+              namespace);
+          throw new RuntimeException(msg, e);
+        }
+
         this.latestNs = prevNs;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 6fc9700..110f30a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -863,12 +863,16 @@ public class ProxyInvocationHandlerTest {
   }
 
   @Test
-  public void testDisplayDataNullValuesConvertedToEmptyString() {
+  public void testDisplayDataNullValuesConvertedToEmptyString() throws 
Exception {
     FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
     options.setFoo(null);
 
     DisplayData data = DisplayData.from(options);
     assertThat(data, hasDisplayItem("foo", ""));
+
+    FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, 
options);
+    DisplayData deserializedData = DisplayData.from(deserializedOptions);
+    assertThat(deserializedData, hasDisplayItem("foo", ""));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 21b2e33..478724b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.isEmptyOrNullString;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
@@ -39,11 +40,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -69,7 +65,6 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -86,6 +81,7 @@ import java.util.regex.Pattern;
 @RunWith(JUnit4.class)
 public class DisplayDataTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
+
   private static final DateTimeFormatter ISO_FORMATTER = 
ISODateTimeFormat.dateTime();
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
@@ -413,7 +409,7 @@ public class DisplayDataTest implements Serializable {
 
   @Test
   public void testNullNamespaceOverride() {
-    thrown.expect(NullPointerException.class);
+    thrown.expectCause(isA(NullPointerException.class));
 
     DisplayData.from(new HasDisplayData() {
       @Override
@@ -516,7 +512,7 @@ public class DisplayDataTest implements Serializable {
 
   @Test
   public void testDuplicateKeyThrowsException() {
-    thrown.expect(IllegalArgumentException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
     DisplayData.from(
         new HasDisplayData() {
           @Override
@@ -752,7 +748,7 @@ public class DisplayDataTest implements Serializable {
       }
     };
 
-    thrown.expect(ClassCastException.class);
+    thrown.expectCause(isA(ClassCastException.class));
     DisplayData.from(component);
   }
 
@@ -838,7 +834,7 @@ public class DisplayDataTest implements Serializable {
 
   @Test
   public void testIncludeNull() {
-    thrown.expect(NullPointerException.class);
+    thrown.expectCause(isA(NullPointerException.class));
     DisplayData.from(
         new HasDisplayData() {
           @Override
@@ -856,7 +852,7 @@ public class DisplayDataTest implements Serializable {
       }
     };
 
-    thrown.expect(NullPointerException.class);
+    thrown.expectCause(isA(NullPointerException.class));
     DisplayData.from(new HasDisplayData() {
         @Override
         public void populateDisplayData(Builder builder) {
@@ -867,7 +863,7 @@ public class DisplayDataTest implements Serializable {
 
   @Test
   public void testNullKey() {
-    thrown.expect(NullPointerException.class);
+    thrown.expectCause(isA(NullPointerException.class));
     DisplayData.from(
         new HasDisplayData() {
           @Override
@@ -968,23 +964,66 @@ public class DisplayDataTest implements Serializable {
   }
 
   /**
-   * Validate that all runners are resilient to exceptions thrown while 
retrieving display data.
+   * Verify that {@link DisplayData.Builder} can recover from exceptions 
thrown in user code.
+   * This is not used within the Beam SDK since we want all code to produce 
valid DisplayData.
+   * This test just ensures it is possible to write custom code that does 
recover.
    */
   @Test
-  @Category(RunnableOnService.class)
-  public void testRunnersResilientToDisplayDataExceptions() {
-    Pipeline p = TestPipeline.create();
-    PCollection<Integer> pCol = p
-        .apply(Create.of(1, 2, 3))
-        .apply(new IdentityTransform<Integer>() {
-          @Override
-          public void populateDisplayData(Builder builder) {
-            throw new RuntimeException("bug!");
-          }
-        });
+  public void testCanRecoverFromBuildException() {
+    final HasDisplayData safeComponent = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add(DisplayData.item("a", "a"));
+      }
+    };
+
+    final HasDisplayData failingComponent = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        throw new RuntimeException("oh noes!");
+      }
+    };
+
+    DisplayData displayData = DisplayData.from(new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder
+            .add(DisplayData.item("b", "b"))
+            .add(DisplayData.item("c", "c"));
+
+        try {
+          builder.include(failingComponent);
+          fail("Expected exception not thrown");
+        } catch (RuntimeException e) {
+          // Expected
+        }
+
+        builder
+            .include(safeComponent)
+            .add(DisplayData.item("d", "d"));
+      }
+    });
+
+    assertThat(displayData, hasDisplayItem("a"));
+    assertThat(displayData, hasDisplayItem("b"));
+    assertThat(displayData, hasDisplayItem("c"));
+    assertThat(displayData, hasDisplayItem("d"));
+  }
 
-    PAssert.that(pCol).containsInAnyOrder(1, 2, 3);
-    p.run();
+  @Test
+  public void testExceptionMessage() {
+    final RuntimeException cause = new RuntimeException("oh noes!");
+    HasDisplayData component = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        throw cause;
+      }
+    };
+
+    thrown.expectMessage(component.getClass().getName());
+    thrown.expectCause(is(cause));
+
+    DisplayData.from(component);
   }
 
   private static class IdentityTransform<T> extends PTransform<PCollection<T>, 
PCollection<T>> {

Reply via email to