Repository: beam
Updated Branches:
  refs/heads/master 9c4a784bb -> caaa64dc6


BEAM-1443: Preserve window duplication on window assignment.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71264671
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71264671
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71264671

Branch: refs/heads/master
Commit: 712646714b8efec6b024ebd2ba0f0e066280f36e
Parents: 9c4a784
Author: Robert Bradshaw <rober...@google.com>
Authored: Wed Feb 8 17:58:32 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Feb 10 09:47:47 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.py         | 13 +++++++++----
 sdks/python/apache_beam/transforms/window_test.py | 16 ++++++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 6f86ca0..2c1032d 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -224,7 +224,7 @@ class DoFnRunner(Receiver):
     # Call for the process function for each window if has windowed side inputs
     # or if the process accesses the window parameter. We can just call it once
     # otherwise as none of the arguments are changing
-    if self.has_windowed_inputs and len(element.windows) > 1:
+    if self.has_windowed_inputs and len(element.windows) != 1:
       for w in element.windows:
         self._dofn_per_window_invoker(
             WindowedValue(element.value, element.timestamp, (w,)))
@@ -280,7 +280,7 @@ class DoFnRunner(Receiver):
     else:
       raise
 
-  def _process_outputs(self, element, results):
+  def _process_outputs(self, windowed_input_element, results):
     """Dispatch the result of computation to the appropriate receivers.
 
     A value wrapped in a SideOutputValue object will be unwrapped and
@@ -297,7 +297,10 @@ class DoFnRunner(Receiver):
         result = result.value
       if isinstance(result, WindowedValue):
         windowed_value = result
-      elif element is None:
+        if (windowed_input_element is not None
+            and len(windowed_input_element.windows) != 1):
+          windowed_value.windows *= len(windowed_input_element.windows)
+      elif windowed_input_element is None:
         # Start and finish have no element from which to grab context,
         # but may emit elements.
         if isinstance(result, TimestampedValue):
@@ -315,8 +318,10 @@ class DoFnRunner(Receiver):
         windowed_value = WindowedValue(
             result.value, result.timestamp,
             self.window_fn.assign(assign_context))
+        if len(windowed_input_element.windows) != 1:
+          windowed_value.windows *= len(windowed_input_element.windows)
       else:
-        windowed_value = element.with_value(result)
+        windowed_value = windowed_input_element.with_value(result)
       if tag is None:
         self.main_receivers.receive(windowed_value)
       else:

http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 1a21709..c4072ac 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -176,6 +176,22 @@ class WindowTest(unittest.TestCase):
                                   ('key', [5, 6, 7, 8, 9])]))
     p.run()
 
+  def test_rewindow(self):
+    p = TestPipeline()
+    result = (p
+              | Create([(k, k) for k in range(10)])
+              | Map(lambda (x, t): TimestampedValue(x, t))
+              | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
+              # Per the model, each element is now duplicated across
+              # three windows. Rewindowing must preserve this duplication.
+              | 'rewindow' >> WindowInto(FixedWindows(5))
+              | 'rewindow2' >> WindowInto(FixedWindows(5))
+              | Map(lambda v: ('key', v))
+              | GroupByKey())
+    assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
+                                  ('key', sorted([5, 6, 7, 8, 9] * 3))]))
+    p.run()
+
   def test_timestamped_with_combiners(self):
     p = TestPipeline()
     result = (p

Reply via email to