Repository: incubator-beam Updated Branches: refs/heads/master ffbfc66e1 -> cc448e976
[BEAM-196] provide PipelineOptions in DoFn - fixes NPE when accessing the PipelineOptions - adds a test to verify that the PipelineOptions are available Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eced106e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eced106e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eced106e Branch: refs/heads/master Commit: eced106e50ddb257524a7826ab7d27254be89da8 Parents: d10ae23 Author: Maximilian Michels <m...@apache.org> Authored: Tue Jun 7 13:57:33 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Jun 8 15:19:50 2016 +0200 ---------------------------------------------------------------------- .../streaming/FlinkAbstractParDoWrapper.java | 11 ++- .../beam/runners/flink/PipelineOptionsTest.java | 97 +++++++++++++++++++- 2 files changed, 100 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index 117303c..a935011 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -37,6 +38,7 @@ import com.google.common.base.Preconditions; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; @@ -52,7 +54,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl private final DoFn<IN, OUTDF> doFn; private final WindowingStrategy<?, ?> windowingStrategy; - private transient PipelineOptions options; + private final SerializedPipelineOptions serializedPipelineOptions; private DoFnProcessContext context; @@ -62,7 +64,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl Preconditions.checkNotNull(doFn); this.doFn = doFn; - this.options = options; + this.serializedPipelineOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; } @@ -107,7 +109,8 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl private WindowedValue<IN> element; - private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { + private DoFnProcessContext(DoFn<IN, OUTDF> function, + Collector<WindowedValue<OUTFL>> outCollector) { function.super(); super.setupDelegateAggregators(); @@ -156,7 +159,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl @Override public PipelineOptions getPipelineOptions() { - return options; + return serializedPipelineOptions.getPipelineOptions(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 464c6df..d571f31 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -18,14 +18,29 @@ package org.apache.beam.runners.flink; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.commons.lang.SerializationUtils; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests the serialization and deserialization of PipelineOptions. @@ -58,11 +73,85 @@ public class PipelineOptionsTest { @Test public void testCaching() { - MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class); + PipelineOptions deserializedOptions = serializedOptions.getPipelineOptions().as(PipelineOptions.class); assertNotNull(deserializedOptions); - assertEquals(deserializedOptions, serializedOptions.getPipelineOptions()); - assertEquals(deserializedOptions, serializedOptions.getPipelineOptions()); - assertEquals(deserializedOptions, serializedOptions.getPipelineOptions()); + assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); + assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); + assertTrue(deserializedOptions == serializedOptions.getPipelineOptions()); + } + + @Test(expected = Exception.class) + public void testNonNull() { + new SerializedPipelineOptions(null); + } + + @Test(expected = Exception.class) + public void ParDoBaseClassPipelineOptionsNullTest() { + new TestParDoWrapper(null, WindowingStrategy.globalDefault(), new TestDoFn()); + } + + /** + * Tests that PipelineOptions are present after serialization + */ + @Test + public void ParDoBaseClassPipelineOptionsSerializationTest() throws Exception { + TestParDoWrapper wrapper = + new TestParDoWrapper(options, WindowingStrategy.globalDefault(), new TestDoFn()); + + final byte[] serialized = SerializationUtils.serialize(wrapper); + TestParDoWrapper deserialize = (TestParDoWrapper) SerializationUtils.deserialize(serialized); + + // execute once to access options + deserialize.flatMap( + WindowedValue.of( + new Object(), + Instant.now(), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING), + Mockito.mock(Collector.class)); + } + + private static class TestDoFn extends DoFn<Object, Object> { + + @Override + public void processElement(ProcessContext c) throws Exception { + Assert.assertNotNull(c.getPipelineOptions()); + Assert.assertEquals( + options.getTestOption(), + c.getPipelineOptions().as(MyOptions.class).getTestOption()); + } + } + + private static class TestParDoWrapper extends FlinkAbstractParDoWrapper { + public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, DoFn doFn) { + super(options, windowingStrategy, doFn); + } + + + @Override + public WindowingInternals windowingInternalsHelper( + WindowedValue inElement, + Collector outCollector) { + return null; + } + + @Override + public void sideOutputWithTimestampHelper( + WindowedValue inElement, + Object output, + Instant timestamp, + Collector outCollector, + TupleTag tag) {} + + @Override + public void outputWithTimestampHelper( + WindowedValue inElement, + Object output, + Instant timestamp, + Collector outCollector) {} + } + + }