[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16280959#comment-16280959 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb closed pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 689eab7b842..ccf2516eee9 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -565,9 +565,10 @@ class WindowIntoDoFn(beam.DoFn): def __init__(self, windowing): self.windowing = windowing -def process(self, element, timestamp=beam.DoFn.TimestampParam): +def process(self, element, timestamp=beam.DoFn.TimestampParam, +window=beam.DoFn.WindowParam): new_windows = self.windowing.windowfn.assign( - WindowFn.AssignContext(timestamp, element=element)) + WindowFn.AssignContext(timestamp, element=element, window=window)) yield WindowedValue(element, timestamp, new_windows) from apache_beam.transforms.core import Windowing from apache_beam.transforms.window import WindowFn, WindowedValue diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 34c15f9c191..2f18bdee0b1 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -19,13 +19,16 @@ from __future__ import absolute_import +import collections import glob import tempfile from apache_beam import pvalue from apache_beam.transforms import window from apache_beam.transforms.core import Create +from apache_beam.transforms.core import DoFn from apache_beam.transforms.core import Map +from apache_beam.transforms.core import ParDo from apache_beam.transforms.core import WindowInto from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.util import CoGroupByKey @@ -37,6 +40,7 @@ 'is_empty', # open_shards is internal and has no backwards compatibility guarantees. 'open_shards', +'TestWindowedValue', ] @@ -46,11 +50,32 @@ class BeamAssertException(Exception): pass +# Used for reifying timestamps and windows for assert_that matchers. +TestWindowedValue = collections.namedtuple( +'TestWindowedValue', 'value timestamp windows') + + +def contains_in_any_order(iterable): + """Creates an object that matches another iterable if they both have the + same count of items. + + Arguments: +iterable: An iterable of hashable objects. + """ + class InAnyOrder(object): +def __init__(self, iterable): + self._counter = collections.Counter(iterable) + +def __eq__(self, other): + return self._counter == collections.Counter(other) + + return InAnyOrder(iterable) + + # Note that equal_to always sorts the expected and actual since what we # compare are PCollections for which there is no guaranteed order. # However the sorting does not go beyond top level therefore [1,2] and [2,1] # are considered equal and [[1,2]] and [[2,1]] are not. -# TODO(silviuc): Add contains_in_any_order-style matchers. def equal_to(expected): expected = list(expected) @@ -72,7 +97,7 @@ def _empty(actual): return _empty -def assert_that(actual, matcher, label='assert_that'): +def assert_that(actual, matcher, label='assert_that', reify_windows=False): """A PTransform that checks a PCollection has an expected value. Note that assert_that should be used only for testing pipelines since the @@ -85,15 +110,27 @@ def assert_that(actual, matcher, label='assert_that'): expectations and raises BeamAssertException if they are not met. label: Optional string label. This is needed in case several assert_that transforms are introduced in the same pipeline. +reify_windows: If True, matcher is passed a list of TestWindowedValue. Returns: Ignored. """ assert isinstance(actual, pvalue.PCollection) + class ReifyTimestampWindow(DoFn): +def process(self, element, timestamp=DoFn.TimestampParam, +window=DoFn.WindowParam): + # This returns TestWindowedValue instead of + # beam.utils.windowed_value.WindowedValue because ParDo will extract + # the timestamp and window out of the latter. + return [TestWindowedValue(element, timestamp, [window])] + class AssertThat(PTransform): def expand(self, pcoll): + if reify_windows: +pcoll = pcoll | ParDo(ReifyTimestampWindow()) +
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275314#comment-16275314 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on issue #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#issuecomment-348653664 Looks good. Jenkins is complaining. Jenkins: retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275312#comment-16275312 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154481382 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window Review comment: Good point. Maybe call this RestoreTimestamps or RestoreWindows? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273912#comment-16273912 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154264110 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) Review comment: > **robertwb** wrote: > We shouldn't have to check for this. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273911#comment-16273911 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154264108 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): Review comment: > **robertwb** wrote: > Reify means to make explicit. Should just be ReifyTimestamps. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273907#comment-16273907 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154264105 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window +# value. +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +windowing_saved = pcoll.windowing +result = (pcoll + | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) Review comment: > **robertwb** wrote: > Most of these explicit stage names are redundant with what the default would already be. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273908#comment-16273908 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154264103 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window Review comment: > **robertwb** wrote: > I'm not following this comment... > > Shouldn't things already be in the correct window? We just need to emit a TimestampedValue here. Passing a TimestampedValue would make the output processor for DoFn call the windowing function. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L451 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273909#comment-16273909 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154264106 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window +# value. +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +windowing_saved = pcoll.windowing +result = (pcoll + | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) + | 'IdentityWindow' >> WindowInto( + _IdentityWindowFn( + windowing_saved.windowfn.get_window_coder()), + trigger=AfterCount(1), + accumulation_mode=AccumulationMode.DISCARDING, + timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST, + ) + | 'GroupByKey' >> GroupByKey() + | 'ExpandIterable' >> FlatMap( + lambda e: [(e[0], value) for value in e[1]]) + | 'ReifyTimestampsExtract' >> ParDo(ReifyTimestampsExtract())) +result._windowing = windowing_saved +return result + + +@typehints.with_input_types(T) +@typehints.with_output_types(T) +class Reshuffle(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + Reshuffle adds a temporary random key to each element, performs a + ReshufflePerKey, and finally removes the temporary key. + + Reshuffle is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +return (pcoll +| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t)) +| ReshufflePerKey() +| 'RemoveTempKeys' >> Map(lambda t: t[1])) Review comment: > **robertwb** wrote: > RemoveRandomKeys (for symmetry). Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For qu
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273910#comment-16273910 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154264107 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): Review comment: > **robertwb** wrote: > ApplyReifiedTimestamps? Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273679#comment-16273679 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r154235616 ## File path: sdks/python/apache_beam/testing/util_test.py ## @@ -32,18 +37,56 @@ def test_assert_that_passes(self): with TestPipeline() as p: assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3])) + def test_assert_that_passes_empty_equal_to(self): +with TestPipeline() as p: + assert_that(p | Create([]), equal_to([])) + + def test_assert_that_passes_empty_is_empty(self): +with TestPipeline() as p: + assert_that(p | Create([]), is_empty()) + + def test_windowed_value_passes(self): +expected = [TestWindowedValue(v, MIN_TIMESTAMP, [GlobalWindow()]) +for v in [1, 2, 3]] +with TestPipeline() as p: + assert_that(p | Create([2, 3, 1]), equal_to(expected), reify_windows=True) + def test_assert_that_fails(self): -with self.assertRaises(Exception): +with self.assertRaises(BeamAssertException): Review comment: :/ I changed this because my code had a bug that raised another exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271787#comment-16271787 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153939801 ## File path: sdks/python/apache_beam/testing/util_test.py ## @@ -32,18 +37,56 @@ def test_assert_that_passes(self): with TestPipeline() as p: assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3])) + def test_assert_that_passes_empty_equal_to(self): +with TestPipeline() as p: + assert_that(p | Create([]), equal_to([])) + + def test_assert_that_passes_empty_is_empty(self): +with TestPipeline() as p: + assert_that(p | Create([]), is_empty()) + + def test_windowed_value_passes(self): +expected = [TestWindowedValue(v, MIN_TIMESTAMP, [GlobalWindow()]) +for v in [1, 2, 3]] +with TestPipeline() as p: + assert_that(p | Create([2, 3, 1]), equal_to(expected), reify_windows=True) + def test_assert_that_fails(self): -with self.assertRaises(Exception): +with self.assertRaises(BeamAssertException): Review comment: Not all runners preserve exceptions from remote workers. Please change back. (Same elsewhere.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271792#comment-16271792 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153940718 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) Review comment: We shouldn't have to check for this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271790#comment-16271790 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153947324 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window +# value. +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +windowing_saved = pcoll.windowing +result = (pcoll + | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) Review comment: Most of these explicit stage names are redundant with what the default would already be. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271791#comment-16271791 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153941099 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): Review comment: ApplyReifiedTimestamps? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271789#comment-16271789 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153947424 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window +# value. +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +windowing_saved = pcoll.windowing +result = (pcoll + | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) + | 'IdentityWindow' >> WindowInto( + _IdentityWindowFn( + windowing_saved.windowfn.get_window_coder()), + trigger=AfterCount(1), + accumulation_mode=AccumulationMode.DISCARDING, + timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST, + ) + | 'GroupByKey' >> GroupByKey() + | 'ExpandIterable' >> FlatMap( + lambda e: [(e[0], value) for value in e[1]]) + | 'ReifyTimestampsExtract' >> ParDo(ReifyTimestampsExtract())) +result._windowing = windowing_saved +return result + + +@typehints.with_input_types(T) +@typehints.with_output_types(T) +class Reshuffle(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + Reshuffle adds a temporary random key to each element, performs a + ReshufflePerKey, and finally removes the temporary key. + + Reshuffle is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +return (pcoll +| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t)) +| ReshufflePerKey() +| 'RemoveTempKeys' >> Map(lambda t: t[1])) Review comment: RemoveRandomKeys (for symmetry). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrast
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271788#comment-16271788 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153941071 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +# Return a WindowedValue so that IdentityWindowFn can reuse the window Review comment: I'm not following this comment... Shouldn't things already be in the correct window? We just need to emit a TimestampedValue here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271786#comment-16271786 ] ASF GitHub Bot commented on BEAM-1872: -- robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153940603 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +434,105 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class _IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, window_coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + window_coder: coders.Coder object to be used on windows. +""" +super(_IdentityWindowFn, self).__init__() +if window_coder is None: + raise ValueError('window_coder should not be None') +self._window_coder = window_coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._window_coder + + def to_runner_api_parameter(self, unused_context): +pass # Overridden by register_pickle_urn below. + + urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM) + + +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class ReshufflePerKey(PTransform): + """PTransform that returns a PCollection equivalent to its input, + but operationally provides some of the side effects of a GroupByKey, + in particular preventing fusion of the surrounding transforms, + checkpointing, and deduplication by id. + + ReshufflePerKey is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ReifyTimestampsIn(DoFn): Review comment: Reify means to make explicit. Should just be ReifyTimestamps. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271349#comment-16271349 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on issue #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#issuecomment-347960498 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269819#comment-16269819 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153664972 ## File path: sdks/python/apache_beam/testing/util.py ## @@ -46,6 +56,26 @@ class BeamAssertException(Exception): pass +# Used for reifying timestamps and windows for assert_that matchers. Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269680#comment-16269680 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653663 ## File path: sdks/python/apache_beam/testing/util.py ## @@ -46,6 +56,26 @@ class BeamAssertException(Exception): pass +# Used for reifying timestamps and windows for assert_that matchers. Review comment: > **robertwb** wrote: > Ideally one should be able to use any callable, e.g. hamcrest matchers, rather than have to implement a windowed variant of each. I misspoke about having a windowed_equals_to, we should have a `assert_that_windowed(pcoll, equal_to([WindowedValue(...), ...]))`, or `assert_that(pcoll, equal_to([WindowedValue(...), ...], reify_windows=True)`. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269688#comment-16269688 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653666 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._coder + +class TriggerForEveryElement(TriggerFn): + + def __repr__(self): +return 'TriggerForEveryElement' + + def __eq__(self, other): +return type(self) == type(other) + + def on_element(self, element, window, context): +pass + + def on_merge(self, to_be_merged, merge_result, context): +# doesn't merge +pass + + def should_fire(self, watermark, window, context): +return True + + def on_fire(self, watermark, window, context): +return True + + def reset(self, window, context): +pass + + @staticmethod + def from_runner_api(unused_proto, unused_context): +return TriggerForEveryElement() + + def to_runner_api(self, unused_context): +# TODO: add TriggerForEveryElement to proto +return beam_runner_api_pb2.Trigger( +element_count=beam_runner_api_pb2.Trigger.ElementCount( +element_count=0)) + + +# TODO(ehudm): compare with Java implementation for more edge cases. +# TODO: are these typehints necessary? +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class Reshuffle(PTransform): + """TODO description + + Reshuffle is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ExpandIterableDoFn(DoFn): + def process(self, element): +return [(element[0], value) for value in element[1]] + +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +# TODO: is it safe to reapply this value? +windowing_saved = pcoll.windowing +# TODO: add .with_input_types, .with_output_types to PTransforms below? +pcoll_intermediate = (pcoll +| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) +| 'IdentityWindow' >> WindowInto( +IdentityWindowFn(windowing_saved.windowfn.get_window_coder()), +trigger=TriggerForEveryElement(), +accumulation_mode=AccumulationMode.DISCARDING, +# TODO: timestamp_combiner= Review comment: > **robertwb** wrote: > This must be the min timestamp combiner. Done, but I don't know how to test this. That is, nothing prevents the reshuffle code from moving the timestamps around forwards and then backwards when it restores them. The windows are preserved so temporarily having the wrong timestamps during Reshuffle doesn't seems to change windowing either. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Jav
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269683#comment-16269683 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653670 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._coder + +class TriggerForEveryElement(TriggerFn): + + def __repr__(self): +return 'TriggerForEveryElement' + + def __eq__(self, other): +return type(self) == type(other) + + def on_element(self, element, window, context): +pass + + def on_merge(self, to_be_merged, merge_result, context): +# doesn't merge +pass + + def should_fire(self, watermark, window, context): +return True + + def on_fire(self, watermark, window, context): +return True + + def reset(self, window, context): +pass + + @staticmethod + def from_runner_api(unused_proto, unused_context): +return TriggerForEveryElement() + + def to_runner_api(self, unused_context): +# TODO: add TriggerForEveryElement to proto +return beam_runner_api_pb2.Trigger( +element_count=beam_runner_api_pb2.Trigger.ElementCount( +element_count=0)) + + +# TODO(ehudm): compare with Java implementation for more edge cases. +# TODO: are these typehints necessary? +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class Reshuffle(PTransform): Review comment: > **robertwb** wrote: > As mentioned, rename this ReshufflePerKey, and add a Reshuffle that appends then strips a random key (e.g. random.getrandbits(32)) Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269687#comment-16269687 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653668 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._coder + +class TriggerForEveryElement(TriggerFn): Review comment: > **robertwb** wrote: > Just use AfterCount(1). Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269679#comment-16269679 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653661 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._coder + +class TriggerForEveryElement(TriggerFn): + + def __repr__(self): +return 'TriggerForEveryElement' + + def __eq__(self, other): +return type(self) == type(other) + + def on_element(self, element, window, context): +pass + + def on_merge(self, to_be_merged, merge_result, context): +# doesn't merge +pass + + def should_fire(self, watermark, window, context): +return True + + def on_fire(self, watermark, window, context): +return True + + def reset(self, window, context): +pass + + @staticmethod + def from_runner_api(unused_proto, unused_context): +return TriggerForEveryElement() + + def to_runner_api(self, unused_context): +# TODO: add TriggerForEveryElement to proto +return beam_runner_api_pb2.Trigger( +element_count=beam_runner_api_pb2.Trigger.ElementCount( +element_count=0)) + + +# TODO(ehudm): compare with Java implementation for more edge cases. +# TODO: are these typehints necessary? +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class Reshuffle(PTransform): + """TODO description + + Reshuffle is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ExpandIterableDoFn(DoFn): Review comment: > **robertwb** wrote: > A DoFn with nothing but a process method can be more simply implemented via beam.Map or beam.FlatMap. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269685#comment-16269685 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653669 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._coder + +class TriggerForEveryElement(TriggerFn): + + def __repr__(self): +return 'TriggerForEveryElement' + + def __eq__(self, other): +return type(self) == type(other) + + def on_element(self, element, window, context): +pass + + def on_merge(self, to_be_merged, merge_result, context): +# doesn't merge +pass + + def should_fire(self, watermark, window, context): +return True + + def on_fire(self, watermark, window, context): +return True + + def reset(self, window, context): +pass + + @staticmethod + def from_runner_api(unused_proto, unused_context): +return TriggerForEveryElement() + + def to_runner_api(self, unused_context): +# TODO: add TriggerForEveryElement to proto +return beam_runner_api_pb2.Trigger( +element_count=beam_runner_api_pb2.Trigger.ElementCount( +element_count=0)) + + +# TODO(ehudm): compare with Java implementation for more edge cases. +# TODO: are these typehints necessary? +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class Reshuffle(PTransform): + """TODO description + + Reshuffle is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ExpandIterableDoFn(DoFn): + def process(self, element): +return [(element[0], value) for value in element[1]] + +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +# TODO: is it safe to reapply this value? +windowing_saved = pcoll.windowing +# TODO: add .with_input_types, .with_output_types to PTransforms below? Review comment: > **robertwb** wrote: > It should be able to infer. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269689#comment-16269689 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653665 ## File path: sdks/python/apache_beam/pvalue.py ## @@ -124,6 +124,11 @@ def windowing(self): self.producer.inputs) return self._windowing + # TODO(ehudm): Make this internal. + @windowing.setter Review comment: > **robertwb** wrote: > As discussed, remove this setter. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269682#comment-16269682 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653664 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder Review comment: > **robertwb** wrote: > Rename this window_coder? Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269681#comment-16269681 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653662 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): Review comment: > **robertwb** wrote: > Make this private (with an underscore)? Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269684#comment-16269684 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653671 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -423,3 +431,115 @@ def expand(self, pcoll): self._batch_size_estimator)) else: return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator)) + + +class IdentityWindowFn(NonMergingWindowFn): + """Windowing function that preserves existing windows. + + To be used internally with the Reshuffle transform. + Will raise an exception when used after DoFns that return TimestampedValue + elements. + """ + + def __init__(self, coder): +"""Create a new WindowFn with compatible coder. +To be applied to PCollections with windows that are compatible with the +given coder. + +Arguments: + coder: coders.Coder object to be used on windows. +""" +super(IdentityWindowFn, self).__init__() +if coder is None: + raise ValueError('coder should not be None') +self._coder = coder + + def assign(self, assign_context): +if assign_context.window is None: + raise ValueError( + 'assign_context.window should not be None. ' + 'This might be due to a DoFn returning a TimestampedValue.') +return [assign_context.window] + + def get_window_coder(self): +return self._coder + +class TriggerForEveryElement(TriggerFn): + + def __repr__(self): +return 'TriggerForEveryElement' + + def __eq__(self, other): +return type(self) == type(other) + + def on_element(self, element, window, context): +pass + + def on_merge(self, to_be_merged, merge_result, context): +# doesn't merge +pass + + def should_fire(self, watermark, window, context): +return True + + def on_fire(self, watermark, window, context): +return True + + def reset(self, window, context): +pass + + @staticmethod + def from_runner_api(unused_proto, unused_context): +return TriggerForEveryElement() + + def to_runner_api(self, unused_context): +# TODO: add TriggerForEveryElement to proto +return beam_runner_api_pb2.Trigger( +element_count=beam_runner_api_pb2.Trigger.ElementCount( +element_count=0)) + + +# TODO(ehudm): compare with Java implementation for more edge cases. +# TODO: are these typehints necessary? +@typehints.with_input_types(typehints.KV[K, V]) +@typehints.with_output_types(typehints.KV[K, V]) +class Reshuffle(PTransform): + """TODO description + + Reshuffle is experimental. No backwards compatibility guarantees. + """ + + def expand(self, pcoll): +class ExpandIterableDoFn(DoFn): + def process(self, element): +return [(element[0], value) for value in element[1]] + +class ReifyTimestampsIn(DoFn): + def process(self, element, timestamp=DoFn.TimestampParam): +if (isinstance(timestamp, type(DoFn.TimestampParam)) and +timestamp == DoFn.TimestampParam): + raise ValueError('timestamp was unset for element: %r' % element) +yield element[0], TimestampedValue(element[1], timestamp) + +class ReifyTimestampsExtract(DoFn): + def process(self, element, window=DoFn.WindowParam): +yield windowed_value.WindowedValue( +(element[0], element[1].value), element[1].timestamp, [window]) + +# TODO: is it safe to reapply this value? +windowing_saved = pcoll.windowing +# TODO: add .with_input_types, .with_output_types to PTransforms below? +pcoll_intermediate = (pcoll +| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn()) +| 'IdentityWindow' >> WindowInto( +IdentityWindowFn(windowing_saved.windowfn.get_window_coder()), +trigger=TriggerForEveryElement(), +accumulation_mode=AccumulationMode.DISCARDING, +# TODO: timestamp_combiner= +) +| 'GroupByKey' >> GroupByKey() +| 'ExpandIterable' >> ParDo(ExpandIterableDoFn())) +pcoll_intermediate.windowing = windowing_saved Review comment: > **robertwb** wrote: > Nit: I'd probably apply assign windowing as the very last thing rather than on this intermediate. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > ---
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269686#comment-16269686 ] ASF GitHub Bot commented on BEAM-1872: -- udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in Reshuffle URL: https://github.com/apache/beam/pull/4040#discussion_r153653667 ## File path: sdks/python/apache_beam/testing/util.py ## @@ -80,9 +125,11 @@ def assert_that(actual, matcher, label='assert_that'): Args: actual: A PCollection. -matcher: A matcher function taking as argument the actual value of a - materialized PCollection. The matcher validates this actual value against - expectations and raises BeamAssertException if they are not met. +matcher: Two options: + 1. A matcher function taking as argument the actual value of a + materialized PCollection. The matcher validates this actual value + against expectations and raises BeamAssertException if they are not met. + 2. An instance of WindowedValueMatcher. Review comment: > **udim** wrote: > I thought of doing that, but how does assert_that know if matcher accepts values or TestWindowedValues? Acknowledged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay >Assignee: Udi Meiri > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219846#comment-16219846 ] ASF GitHub Bot commented on BEAM-1872: -- GitHub user udim opened a pull request: https://github.com/apache/beam/pull/4040 [BEAM-1872] Add IdentityWindowFn for use in Reshuffle BEAM-1872 IdentityWindowFn is intended for internal use in a future implementation of Reshuffle. Add and pass current window to WindowFn.AssignContext, for IdentityWindowFn implementation. Add WindowedValueMatcher in testing/util.py, for use in IdentityWindowFn unit tests. Hi @robertwb. Can you please take a look? You can merge this pull request into a Git repository by running: $ git pull https://github.com/udim/beam reshuffle_py Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/4040.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4040 commit cb5bd6dbba492221e74889c70a503c2544a828c5 Author: Udi Meiri Date: 2017-10-25T22:45:39Z Add IdentityWindowFn. BEAM-1872 IdentityWindowFn is intended for internal use in a future implementation of Reshuffle. Add and pass current window to WindowFn.AssignContext, for IdentityWindowFn implementation. Add WindowedValueMatcher in testing/util.py, for use in IdentityWindowFn unit tests. > implement Reshuffle transform in python, make it experimental in Java > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, sdk-py-core >Reporter: Ahmet Altay > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191854#comment-16191854 ] Ahmet Altay commented on BEAM-1872: --- cc: [~kirpichov] Reshuffle is Java is still being maintained and getting new features (e.g. {{ViaRandomKey}}) (https://github.com/apache/beam/blob/9379ca289a00cae169075728e6230a1d4a317659/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java) It makes sense to make {{Reshuffle}} experimental in both SDKs and implement it for python as well. > implement Reshuffle transform > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Ahmet Altay > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120826#comment-16120826 ] Chamikara Jayalath commented on BEAM-1872: -- Due to the complexities mentioned in following links I don't think this should be a newbie/starter issue. So removed those tags. https://github.com/apache/beam/pull/1036 https://lists.apache.org/thread.html/ac34c9ac665a8d9f67b0254015e44c59ea65ecc1360d4014b95d3b2e@%3Cdev.beam.apache.org%3E > implement Reshuffle transform > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-py >Reporter: Ahmet Altay > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120824#comment-16120824 ] Chamikara Jayalath commented on BEAM-1872: -- I think it's good to keep the JIRA around since currently this is a feature available for Java SDK that is not available for Python SDK. I don't think this will be removed from Java without replacement and at that point we should add a Python SDK version as well. I think we should keep this JIRA around till Reshuffle is actually removed from Java SDK. > implement Reshuffle transform > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-py >Reporter: Ahmet Altay > Labels: sdk-consistency > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014360#comment-16014360 ] Ahmet Altay commented on BEAM-1872: --- Updated link: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java Reshuffle in Java is internal and deprecated, perhaps we can drop this issue. > implement Reshuffle transform > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-py >Reporter: Ahmet Altay > Labels: newbie, sdk-consistency, starter > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1872) implement Reshuffle transform
[ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954454#comment-15954454 ] Ahmet Altay commented on BEAM-1872: --- Similar to the https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java > implement Reshuffle transform > - > > Key: BEAM-1872 > URL: https://issues.apache.org/jira/browse/BEAM-1872 > Project: Beam > Issue Type: Improvement > Components: sdk-py >Reporter: Ahmet Altay > Labels: newbie, sdk-consistency, starter > -- This message was sent by Atlassian JIRA (v6.3.15#6346)