Dissallow (unimplemented) windowed side inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/552f6d7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/552f6d7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/552f6d7b Branch: refs/heads/python-sdk Commit: 552f6d7baa6e6205b290d21e642911a04ca259ec Parents: 24b7bcc Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri Oct 7 15:28:50 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Oct 18 10:48:16 2016 -0700 ---------------------------------------------------------------------- .../python/apache_beam/transforms/sideinputs.py | 4 ++ .../apache_beam/transforms/sideinputs_test.py | 40 ++++++++++++++++++++ 2 files changed, 44 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/552f6d7b/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 6484a7c..6c698da 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -50,6 +50,10 @@ class CreatePCollectionView(PTransform): return input_type def apply(self, pcoll): + if not pcoll.windowing.is_default(): + raise ValueError( + "Side inputs only supported for global windows, default triggering. " + "Found %s" % pcoll.windowing) return self.view http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/552f6d7b/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py new file mode 100644 index 0000000..8e292e3 --- /dev/null +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for side inputs.""" + +import logging +import unittest + +import apache_beam as beam +from apache_beam.transforms import window + + +class SideInputsTest(unittest.TestCase): + + # TODO(BEAM-733): Actually support this. + def test_no_sideinput_windowing(self): + p = beam.Pipeline('DirectPipelineRunner') + pc = p | beam.Create([0, 1]) | beam.WindowInto(window.FixedWindows(10)) + with self.assertRaises(ValueError): + # pylint: disable=expression-not-assigned + pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + unittest.main()