Windowed side input test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90073768 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90073768 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90073768 Branch: refs/heads/python-sdk Commit: 90073768985ad492e5c0dccf1114deefcae06b13 Parents: 6e6d89d Author: Robert Bradshaw <rober...@google.com> Authored: Tue Oct 11 16:43:41 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 18 12:17:15 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/transforms/sideinputs_test.py | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90073768/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 8e292e3..68deba8 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -22,6 +22,7 @@ import unittest import apache_beam as beam from apache_beam.transforms import window +from apache_beam.transforms.util import assert_that, equal_to class SideInputsTest(unittest.TestCase): @@ -34,6 +35,36 @@ class SideInputsTest(unittest.TestCase): # pylint: disable=expression-not-assigned pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc)) + def run_windowed_side_inputs(self, elements, main_window_fn, + side_window_fn=None, + side_input_type=beam.pvalue.AsList, + combine_fn=None, + expected=None): + with beam.Pipeline('DirectPipelineRunner') as p: + pcoll = p | beam.Create(elements) | beam.Map( + lambda t: window.TimestampedValue(t, t)) + main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn) + side = pcoll | 'WindowSide' >> beam.WindowInto( + side_window_fn or main_window_fn) + if combine_fn is not None: + side |= beam.CombineGlobally(combine_fn) + res = main | beam.Map(lambda x, s: (x, s), side_input_type(side)) + if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList): + res |= beam.Map(lambda (x, s): (x, sorted(s))) + assert_that(res, equal_to(expected)) + + def test_global_global_windows(self): + self.run_windowed_side_inputs( + [1, 2, 3], + window.GlobalWindows(), + expected=[(1, [1, 2, 3]), (2, [1, 2, 3]), (3, [1, 2, 3])]) + + def test_same_fixed_windows(self): + self.run_windowed_side_inputs( + [1, 2, 11], + window.FixedWindows(10), + expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])]) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)