Windowed side input test.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90073768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90073768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90073768

Branch: refs/heads/python-sdk
Commit: 90073768985ad492e5c0dccf1114deefcae06b13
Parents: 6e6d89d
Author: Robert Bradshaw <rober...@google.com>
Authored: Tue Oct 11 16:43:41 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Tue Oct 18 12:17:15 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/transforms/sideinputs_test.py   | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90073768/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 8e292e3..68deba8 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -22,6 +22,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam.transforms import window
+from apache_beam.transforms.util import assert_that, equal_to
 
 
 class SideInputsTest(unittest.TestCase):
@@ -34,6 +35,36 @@ class SideInputsTest(unittest.TestCase):
       # pylint: disable=expression-not-assigned
       pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc))
 
+  def run_windowed_side_inputs(self, elements, main_window_fn,
+                               side_window_fn=None,
+                               side_input_type=beam.pvalue.AsList,
+                               combine_fn=None,
+                               expected=None):
+    with beam.Pipeline('DirectPipelineRunner') as p:
+      pcoll = p | beam.Create(elements) | beam.Map(
+          lambda t: window.TimestampedValue(t, t))
+      main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn)
+      side = pcoll | 'WindowSide' >> beam.WindowInto(
+          side_window_fn or main_window_fn)
+      if combine_fn is not None:
+        side |= beam.CombineGlobally(combine_fn)
+      res = main | beam.Map(lambda x, s: (x, s), side_input_type(side))
+      if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList):
+        res |= beam.Map(lambda (x, s): (x, sorted(s)))
+      assert_that(res, equal_to(expected))
+
+  def test_global_global_windows(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 3],
+        window.GlobalWindows(),
+        expected=[(1, [1, 2, 3]), (2, [1, 2, 3]), (3, [1, 2, 3])])
+
+  def test_same_fixed_windows(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 11],
+        window.FixedWindows(10),
+        expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)

Reply via email to