Repository: beam Updated Branches: refs/heads/master 9c4a784bb -> caaa64dc6
BEAM-1443: Preserve window duplication on window assignment. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71264671 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71264671 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71264671 Branch: refs/heads/master Commit: 712646714b8efec6b024ebd2ba0f0e066280f36e Parents: 9c4a784 Author: Robert Bradshaw <rober...@google.com> Authored: Wed Feb 8 17:58:32 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Feb 10 09:47:47 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.py | 13 +++++++++---- sdks/python/apache_beam/transforms/window_test.py | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 6f86ca0..2c1032d 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -224,7 +224,7 @@ class DoFnRunner(Receiver): # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing - if self.has_windowed_inputs and len(element.windows) > 1: + if self.has_windowed_inputs and len(element.windows) != 1: for w in element.windows: self._dofn_per_window_invoker( WindowedValue(element.value, element.timestamp, (w,))) @@ -280,7 +280,7 @@ class DoFnRunner(Receiver): else: raise - def _process_outputs(self, element, results): + def _process_outputs(self, windowed_input_element, results): """Dispatch the result of computation to the appropriate receivers. A value wrapped in a SideOutputValue object will be unwrapped and @@ -297,7 +297,10 @@ class DoFnRunner(Receiver): result = result.value if isinstance(result, WindowedValue): windowed_value = result - elif element is None: + if (windowed_input_element is not None + and len(windowed_input_element.windows) != 1): + windowed_value.windows *= len(windowed_input_element.windows) + elif windowed_input_element is None: # Start and finish have no element from which to grab context, # but may emit elements. if isinstance(result, TimestampedValue): @@ -315,8 +318,10 @@ class DoFnRunner(Receiver): windowed_value = WindowedValue( result.value, result.timestamp, self.window_fn.assign(assign_context)) + if len(windowed_input_element.windows) != 1: + windowed_value.windows *= len(windowed_input_element.windows) else: - windowed_value = element.with_value(result) + windowed_value = windowed_input_element.with_value(result) if tag is None: self.main_receivers.receive(windowed_value) else: http://git-wip-us.apache.org/repos/asf/beam/blob/71264671/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 1a21709..c4072ac 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -176,6 +176,22 @@ class WindowTest(unittest.TestCase): ('key', [5, 6, 7, 8, 9])])) p.run() + def test_rewindow(self): + p = TestPipeline() + result = (p + | Create([(k, k) for k in range(10)]) + | Map(lambda (x, t): TimestampedValue(x, t)) + | 'window' >> WindowInto(SlidingWindows(period=2, size=6)) + # Per the model, each element is now duplicated across + # three windows. Rewindowing must preserve this duplication. + | 'rewindow' >> WindowInto(FixedWindows(5)) + | 'rewindow2' >> WindowInto(FixedWindows(5)) + | Map(lambda v: ('key', v)) + | GroupByKey()) + assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)), + ('key', sorted([5, 6, 7, 8, 9] * 3))])) + p.run() + def test_timestamped_with_combiners(self): p = TestPipeline() result = (p