Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b4c7d10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b4c7d10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b4c7d10 Branch: refs/heads/master Commit: 3b4c7d103c07e73d30b2ad534a17b3059232dbda Parents: 8af13b0 Author: Kenneth Knowles <k...@google.com> Authored: Fri Dec 16 13:43:54 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 16 20:14:19 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunnerTest.java | 53 ++++++++++++++++++++ 1 file changed, 53 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3b4c7d10/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index f068c19..837a162 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -151,6 +151,49 @@ public class SimpleDoFnRunnerTest { TimeDomain.EVENT_TIME); } + @Test + public void testStartBundleExceptionsWrappedAsUserCodeException() { + ThrowingDoFn fn = new ThrowingDoFn(); + DoFnRunner<String, String> runner = + new SimpleDoFnRunner<>( + null, + fn, + null, + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + thrown.expect(UserCodeException.class); + thrown.expectCause(is(fn.exceptionToThrow)); + + runner.startBundle(); + } + + @Test + public void testFinishBundleExceptionsWrappedAsUserCodeException() { + ThrowingDoFn fn = new ThrowingDoFn(); + DoFnRunner<String, String> runner = + new SimpleDoFnRunner<>( + null, + fn, + null, + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + thrown.expect(UserCodeException.class); + thrown.expectCause(is(fn.exceptionToThrow)); + + runner.finishBundle(); + } + + /** * Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying * {@link DoFn}. @@ -200,6 +243,16 @@ public class SimpleDoFnRunnerTest { @TimerId(TIMER_ID) private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + @StartBundle + public void startBundle(Context c) throws Exception { + throw exceptionToThrow; + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + throw exceptionToThrow; + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { throw exceptionToThrow;