Reject all timers in ParDo, for now
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9712f2b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9712f2b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9712f2b Branch: refs/heads/master Commit: f9712f2bacb9aac9d5df5c6021bb3cfb59758806 Parents: ccefc6f Author: Kenneth Knowles <k...@google.com> Authored: Tue Oct 18 13:09:57 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Oct 19 17:52:21 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/transforms/ParDo.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9712f2b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 776f768..8aa87e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -563,12 +563,29 @@ public class ParDo { DoFn.class.getSimpleName())); } + // To be removed when the features are complete and runners have their own adequate + // rejection logic + if (!signature.timerDeclarations().isEmpty()) { + throw new UnsupportedOperationException( + String.format("Found %s annotations on %s, but %s cannot yet be used with timers.", + DoFn.TimerId.class.getSimpleName(), + fn.getClass().getName(), + DoFn.class.getSimpleName())); + } + // State is semantically incompatible with splitting if (!signature.stateDeclarations().isEmpty() && signature.processElement().isSplittable()) { throw new UnsupportedOperationException( String.format("%s is splittable and uses state, but these are not compatible", fn.getClass().getName())); } + + // Timers are semantically incompatible with splitting + if (!signature.timerDeclarations().isEmpty() && signature.processElement().isSplittable()) { + throw new UnsupportedOperationException( + String.format("%s is splittable and uses timers, but these are not compatible", + fn.getClass().getName())); + } } private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, OutputT> fn) {