Repository: incubator-beam
Updated Branches:
  refs/heads/master ffbfc66e1 -> cc448e976


[BEAM-196] provide PipelineOptions in DoFn

- fixes NPE when accessing the PipelineOptions
- adds a test to verify that the PipelineOptions are available


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eced106e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eced106e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eced106e

Branch: refs/heads/master
Commit: eced106e50ddb257524a7826ab7d27254be89da8
Parents: d10ae23
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Jun 7 13:57:33 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Jun 8 15:19:50 2016 +0200

----------------------------------------------------------------------
 .../streaming/FlinkAbstractParDoWrapper.java    | 11 ++-
 .../beam/runners/flink/PipelineOptionsTest.java | 97 +++++++++++++++++++-
 2 files changed, 100 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 117303c..a935011 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
@@ -52,7 +54,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, 
OUTFL> extends RichFl
 
   private final DoFn<IN, OUTDF> doFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
-  private transient PipelineOptions options;
+  private final SerializedPipelineOptions serializedPipelineOptions;
 
   private DoFnProcessContext context;
 
@@ -62,7 +64,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, 
OUTFL> extends RichFl
     Preconditions.checkNotNull(doFn);
 
     this.doFn = doFn;
-    this.options = options;
+    this.serializedPipelineOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
   }
 
@@ -107,7 +109,8 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, 
OUTFL> extends RichFl
 
     private WindowedValue<IN> element;
 
-    private DoFnProcessContext(DoFn<IN, OUTDF> function, 
Collector<WindowedValue<OUTFL>> outCollector) {
+    private DoFnProcessContext(DoFn<IN, OUTDF> function,
+          Collector<WindowedValue<OUTFL>> outCollector) {
       function.super();
       super.setupDelegateAggregators();
 
@@ -156,7 +159,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, 
OUTFL> extends RichFl
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return options;
+      return serializedPipelineOptions.getPipelineOptions();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 464c6df..d571f31 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -18,14 +18,29 @@
 package org.apache.beam.runners.flink;
 
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the serialization and deserialization of PipelineOptions.
@@ -58,11 +73,85 @@ public class PipelineOptionsTest {
 
   @Test
   public void testCaching() {
-    MyOptions deserializedOptions = 
serializedOptions.getPipelineOptions().as(MyOptions.class);
+    PipelineOptions deserializedOptions = 
serializedOptions.getPipelineOptions().as(PipelineOptions.class);
     assertNotNull(deserializedOptions);
-    assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
-    assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
-    assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+  }
+
+  @Test(expected = Exception.class)
+  public void testNonNull() {
+    new SerializedPipelineOptions(null);
+  }
+
+  @Test(expected = Exception.class)
+  public void ParDoBaseClassPipelineOptionsNullTest() {
+    new TestParDoWrapper(null, WindowingStrategy.globalDefault(), new 
TestDoFn());
+  }
+
+  /**
+   * Tests that PipelineOptions are present after serialization
+   */
+  @Test
+  public void ParDoBaseClassPipelineOptionsSerializationTest() throws 
Exception {
+    TestParDoWrapper wrapper =
+        new TestParDoWrapper(options, WindowingStrategy.globalDefault(), new 
TestDoFn());
+
+    final byte[] serialized = SerializationUtils.serialize(wrapper);
+    TestParDoWrapper deserialize = (TestParDoWrapper) 
SerializationUtils.deserialize(serialized);
+
+    // execute once to access options
+    deserialize.flatMap(
+        WindowedValue.of(
+            new Object(),
+            Instant.now(),
+            GlobalWindow.INSTANCE,
+            PaneInfo.NO_FIRING),
+        Mockito.mock(Collector.class));
+
   }
 
+
+  private static class TestDoFn extends DoFn<Object, Object> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      Assert.assertNotNull(c.getPipelineOptions());
+      Assert.assertEquals(
+          options.getTestOption(),
+          c.getPipelineOptions().as(MyOptions.class).getTestOption());
+    }
+  }
+
+  private static class TestParDoWrapper extends FlinkAbstractParDoWrapper {
+    public TestParDoWrapper(PipelineOptions options, WindowingStrategy 
windowingStrategy, DoFn doFn) {
+      super(options, windowingStrategy, doFn);
+    }
+
+
+    @Override
+    public WindowingInternals windowingInternalsHelper(
+        WindowedValue inElement,
+        Collector outCollector) {
+      return null;
+    }
+
+    @Override
+    public void sideOutputWithTimestampHelper(
+        WindowedValue inElement,
+        Object output,
+        Instant timestamp,
+        Collector outCollector,
+        TupleTag tag) {}
+
+    @Override
+    public void outputWithTimestampHelper(
+        WindowedValue inElement,
+        Object output,
+        Instant timestamp,
+        Collector outCollector) {}
+  }
+
+
 }

Reply via email to