Migrated the beam-runners-core module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6710251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6710251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6710251 Branch: refs/heads/python-sdk Commit: b6710251d8bb5d1968aea2258ce5878b43368dd5 Parents: 7106e88 Author: Stas Levin <stasle...@gmail.com> Authored: Sun Dec 18 18:51:31 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:45 2016 -0800 ---------------------------------------------------------------------- .../runners/core/PushbackSideInputDoFnRunnerTest.java | 5 ++++- .../org/apache/beam/runners/core/SplittableParDoTest.java | 10 ++++++++-- .../runners/core/UnboundedReadFromBoundedSourceTest.java | 5 +++-- 3 files changed, 15 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index a1cdbf6..251c7c2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -63,10 +64,12 @@ public class PushbackSideInputDoFnRunnerTest { private TestDoFnRunner<Integer, Integer> underlying; private PCollectionView<Integer> singletonView; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); - TestPipeline p = TestPipeline.create(); PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); singletonView = created http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index cf96b66..0f0b106 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -132,9 +133,13 @@ public class SplittableParDoTest { return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty()); } + @Rule + public TestPipeline pipeline = TestPipeline.create(); + @Test public void testBoundednessForBoundedFn() { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + DoFn<Integer, String> boundedFn = new BoundedFakeFn(); assertEquals( "Applying a bounded SDF to a bounded collection produces a bounded collection", @@ -154,7 +159,8 @@ public class SplittableParDoTest { @Test public void testBoundednessForUnboundedFn() { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + DoFn<Integer, String> unboundedFn = new UnboundedFakeFn(); assertEquals( "Applying an unbounded SDF to a bounded collection produces a bounded collection", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index 7fd8807..86450f2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -78,6 +78,9 @@ public class UnboundedReadFromBoundedSourceTest { @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule + public TestPipeline p = TestPipeline.create(); + @Test public void testCheckpointCoderNulls() throws Exception { CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of()); @@ -97,8 +100,6 @@ public class UnboundedReadFromBoundedSourceTest { UnboundedSource<Long, Checkpoint<Long>> unboundedSource = new BoundedToUnboundedSourceAdapter<>(boundedSource); - Pipeline p = TestPipeline.create(); - PCollection<Long> output = p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));