[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16280959#comment-16280959
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb closed pull request #4040: [BEAM-1872] Add IdentityWindowFn for use in 
Reshuffle
URL: https://github.com/apache/beam/pull/4040
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 689eab7b842..ccf2516eee9 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -565,9 +565,10 @@ class WindowIntoDoFn(beam.DoFn):
 def __init__(self, windowing):
   self.windowing = windowing
 
-def process(self, element, timestamp=beam.DoFn.TimestampParam):
+def process(self, element, timestamp=beam.DoFn.TimestampParam,
+window=beam.DoFn.WindowParam):
   new_windows = self.windowing.windowfn.assign(
-  WindowFn.AssignContext(timestamp, element=element))
+  WindowFn.AssignContext(timestamp, element=element, window=window))
   yield WindowedValue(element, timestamp, new_windows)
   from apache_beam.transforms.core import Windowing
   from apache_beam.transforms.window import WindowFn, WindowedValue
diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index 34c15f9c191..2f18bdee0b1 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -19,13 +19,16 @@
 
 from __future__ import absolute_import
 
+import collections
 import glob
 import tempfile
 
 from apache_beam import pvalue
 from apache_beam.transforms import window
 from apache_beam.transforms.core import Create
+from apache_beam.transforms.core import DoFn
 from apache_beam.transforms.core import Map
+from apache_beam.transforms.core import ParDo
 from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import CoGroupByKey
@@ -37,6 +40,7 @@
 'is_empty',
 # open_shards is internal and has no backwards compatibility guarantees.
 'open_shards',
+'TestWindowedValue',
 ]
 
 
@@ -46,11 +50,32 @@ class BeamAssertException(Exception):
   pass
 
 
+# Used for reifying timestamps and windows for assert_that matchers.
+TestWindowedValue = collections.namedtuple(
+'TestWindowedValue', 'value timestamp windows')
+
+
+def contains_in_any_order(iterable):
+  """Creates an object that matches another iterable if they both have the
+  same count of items.
+
+  Arguments:
+iterable: An iterable of hashable objects.
+  """
+  class InAnyOrder(object):
+def __init__(self, iterable):
+  self._counter = collections.Counter(iterable)
+
+def __eq__(self, other):
+  return self._counter == collections.Counter(other)
+
+  return InAnyOrder(iterable)
+
+
 # Note that equal_to always sorts the expected and actual since what we
 # compare are PCollections for which there is no guaranteed order.
 # However the sorting does not go beyond top level therefore [1,2] and [2,1]
 # are considered equal and [[1,2]] and [[2,1]] are not.
-# TODO(silviuc): Add contains_in_any_order-style matchers.
 def equal_to(expected):
   expected = list(expected)
 
@@ -72,7 +97,7 @@ def _empty(actual):
   return _empty
 
 
-def assert_that(actual, matcher, label='assert_that'):
+def assert_that(actual, matcher, label='assert_that', reify_windows=False):
   """A PTransform that checks a PCollection has an expected value.
 
   Note that assert_that should be used only for testing pipelines since the
@@ -85,15 +110,27 @@ def assert_that(actual, matcher, label='assert_that'):
   expectations and raises BeamAssertException if they are not met.
 label: Optional string label. This is needed in case several assert_that
   transforms are introduced in the same pipeline.
+reify_windows: If True, matcher is passed a list of TestWindowedValue.
 
   Returns:
 Ignored.
   """
   assert isinstance(actual, pvalue.PCollection)
 
+  class ReifyTimestampWindow(DoFn):
+def process(self, element, timestamp=DoFn.TimestampParam,
+window=DoFn.WindowParam):
+  # This returns TestWindowedValue instead of
+  # beam.utils.windowed_value.WindowedValue because ParDo will extract
+  # the timestamp and window out of the latter.
+  return [TestWindowedValue(element, timestamp, [window])]
+
   class AssertThat(PTransform):
 
 def expand(self, pcoll):
+  if reify_windows:
+pcoll = pcoll | ParDo(ReifyTimestampWindow())
+
 

[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275314#comment-16275314
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on issue #4040: [BEAM-1872] Add IdentityWindowFn for use in 
Reshuffle
URL: https://github.com/apache/beam/pull/4040#issuecomment-348653664
 
 
   Looks good. Jenkins is complaining. 
   
   Jenkins: retest this please. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275312#comment-16275312
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154481382
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
 
 Review comment:
   Good point. Maybe call this RestoreTimestamps or RestoreWindows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273912#comment-16273912
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154264110
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
 
 Review comment:
   
   
   > **robertwb** wrote:
   > We shouldn't have to check for this.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273911#comment-16273911
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154264108
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Reify means to make explicit. Should just be ReifyTimestamps.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273907#comment-16273907
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154264105
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
+# value.
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+windowing_saved = pcoll.windowing
+result = (pcoll
+  | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Most of these explicit stage names are redundant with what the default 
would already be.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273908#comment-16273908
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154264103
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
 
 Review comment:
   
   
   > **robertwb** wrote:
   > I'm not following this comment... 
   > 
   > Shouldn't things already be in the correct window? We just need to emit a 
TimestampedValue here.
   
   
   Passing a TimestampedValue would make the output processor for DoFn call the 
windowing function.
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L451


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273909#comment-16273909
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154264106
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
+# value.
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+windowing_saved = pcoll.windowing
+result = (pcoll
+  | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
+  | 'IdentityWindow' >> WindowInto(
+  _IdentityWindowFn(
+  windowing_saved.windowfn.get_window_coder()),
+  trigger=AfterCount(1),
+  accumulation_mode=AccumulationMode.DISCARDING,
+  timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST,
+  )
+  | 'GroupByKey' >> GroupByKey()
+  | 'ExpandIterable' >> FlatMap(
+  lambda e: [(e[0], value) for value in e[1]])
+  | 'ReifyTimestampsExtract' >> ParDo(ReifyTimestampsExtract()))
+result._windowing = windowing_saved
+return result
+
+
+@typehints.with_input_types(T)
+@typehints.with_output_types(T)
+class Reshuffle(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  Reshuffle adds a temporary random key to each element, performs a
+  ReshufflePerKey, and finally removes the temporary key.
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+return (pcoll
+| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
+| ReshufflePerKey()
+| 'RemoveTempKeys' >> Map(lambda t: t[1]))
 
 Review comment:
   
   
   > **robertwb** wrote:
   > RemoveRandomKeys (for symmetry).
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For qu

[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273910#comment-16273910
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154264107
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > ApplyReifiedTimestamps?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273679#comment-16273679
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r154235616
 
 

 ##
 File path: sdks/python/apache_beam/testing/util_test.py
 ##
 @@ -32,18 +37,56 @@ def test_assert_that_passes(self):
 with TestPipeline() as p:
   assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
 
+  def test_assert_that_passes_empty_equal_to(self):
+with TestPipeline() as p:
+  assert_that(p | Create([]), equal_to([]))
+
+  def test_assert_that_passes_empty_is_empty(self):
+with TestPipeline() as p:
+  assert_that(p | Create([]), is_empty())
+
+  def test_windowed_value_passes(self):
+expected = [TestWindowedValue(v, MIN_TIMESTAMP, [GlobalWindow()])
+for v in [1, 2, 3]]
+with TestPipeline() as p:
+  assert_that(p | Create([2, 3, 1]), equal_to(expected), 
reify_windows=True)
+
   def test_assert_that_fails(self):
-with self.assertRaises(Exception):
+with self.assertRaises(BeamAssertException):
 
 Review comment:
   :/ I changed this because my code had a bug that raised another exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271787#comment-16271787
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153939801
 
 

 ##
 File path: sdks/python/apache_beam/testing/util_test.py
 ##
 @@ -32,18 +37,56 @@ def test_assert_that_passes(self):
 with TestPipeline() as p:
   assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
 
+  def test_assert_that_passes_empty_equal_to(self):
+with TestPipeline() as p:
+  assert_that(p | Create([]), equal_to([]))
+
+  def test_assert_that_passes_empty_is_empty(self):
+with TestPipeline() as p:
+  assert_that(p | Create([]), is_empty())
+
+  def test_windowed_value_passes(self):
+expected = [TestWindowedValue(v, MIN_TIMESTAMP, [GlobalWindow()])
+for v in [1, 2, 3]]
+with TestPipeline() as p:
+  assert_that(p | Create([2, 3, 1]), equal_to(expected), 
reify_windows=True)
+
   def test_assert_that_fails(self):
-with self.assertRaises(Exception):
+with self.assertRaises(BeamAssertException):
 
 Review comment:
   Not all runners preserve exceptions from remote workers. Please change back. 
(Same elsewhere.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271792#comment-16271792
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153940718
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
 
 Review comment:
   We shouldn't have to check for this. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271790#comment-16271790
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153947324
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
+# value.
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+windowing_saved = pcoll.windowing
+result = (pcoll
+  | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
 
 Review comment:
   Most of these explicit stage names are redundant with what the default would 
already be. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271791#comment-16271791
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153941099
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
 
 Review comment:
   ApplyReifiedTimestamps?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271789#comment-16271789
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153947424
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
+# value.
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+windowing_saved = pcoll.windowing
+result = (pcoll
+  | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
+  | 'IdentityWindow' >> WindowInto(
+  _IdentityWindowFn(
+  windowing_saved.windowfn.get_window_coder()),
+  trigger=AfterCount(1),
+  accumulation_mode=AccumulationMode.DISCARDING,
+  timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST,
+  )
+  | 'GroupByKey' >> GroupByKey()
+  | 'ExpandIterable' >> FlatMap(
+  lambda e: [(e[0], value) for value in e[1]])
+  | 'ReifyTimestampsExtract' >> ParDo(ReifyTimestampsExtract()))
+result._windowing = windowing_saved
+return result
+
+
+@typehints.with_input_types(T)
+@typehints.with_output_types(T)
+class Reshuffle(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  Reshuffle adds a temporary random key to each element, performs a
+  ReshufflePerKey, and finally removes the temporary key.
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+return (pcoll
+| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
+| ReshufflePerKey()
+| 'RemoveTempKeys' >> Map(lambda t: t[1]))
 
 Review comment:
   RemoveRandomKeys (for symmetry).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrast

[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271788#comment-16271788
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153941071
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+# Return a WindowedValue so that IdentityWindowFn can reuse the window
 
 Review comment:
   I'm not following this comment... 
   
   Shouldn't things already be in the correct window? We just need to emit a 
TimestampedValue here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271786#comment-16271786
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

robertwb commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153940603
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  window_coder: coders.Coder object to be used on windows.
+"""
+super(_IdentityWindowFn, self).__init__()
+if window_coder is None:
+  raise ValueError('window_coder should not be None')
+self._window_coder = window_coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ReifyTimestampsIn(DoFn):
 
 Review comment:
   Reify means to make explicit. Should just be ReifyTimestamps.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271349#comment-16271349
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on issue #4040: [BEAM-1872] Add IdentityWindowFn for use in 
Reshuffle
URL: https://github.com/apache/beam/pull/4040#issuecomment-347960498
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269819#comment-16269819
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153664972
 
 

 ##
 File path: sdks/python/apache_beam/testing/util.py
 ##
 @@ -46,6 +56,26 @@ class BeamAssertException(Exception):
   pass
 
 
+# Used for reifying timestamps and windows for assert_that matchers.
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269680#comment-16269680
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653663
 
 

 ##
 File path: sdks/python/apache_beam/testing/util.py
 ##
 @@ -46,6 +56,26 @@ class BeamAssertException(Exception):
   pass
 
 
+# Used for reifying timestamps and windows for assert_that matchers.
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Ideally one should be able to use any callable, e.g. hamcrest matchers, 
rather than have to implement a windowed variant of each. I misspoke about 
having a windowed_equals_to, we should have a `assert_that_windowed(pcoll, 
equal_to([WindowedValue(...), ...]))`, or `assert_that(pcoll, 
equal_to([WindowedValue(...), ...], reify_windows=True)`.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269688#comment-16269688
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653666
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
+  def process(self, element):
+return [(element[0], value) for value in element[1]]
+
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+# TODO: is it safe to reapply this value?
+windowing_saved = pcoll.windowing
+# TODO: add .with_input_types, .with_output_types to PTransforms below?
+pcoll_intermediate = (pcoll
+| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
+| 'IdentityWindow' >> WindowInto(
+IdentityWindowFn(windowing_saved.windowfn.get_window_coder()),
+trigger=TriggerForEveryElement(),
+accumulation_mode=AccumulationMode.DISCARDING,
+# TODO: timestamp_combiner=
 
 Review comment:
   
   
   > **robertwb** wrote:
   > This must be the min timestamp combiner.
   
   
   Done, but I don't know how to test this. That is, nothing prevents the 
reshuffle code from moving the timestamps around forwards and then backwards 
when it restores them. The windows are preserved so temporarily having the 
wrong timestamps during Reshuffle doesn't seems to change windowing either.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Jav

[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269683#comment-16269683
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653670
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > As mentioned, rename this ReshufflePerKey, and add a Reshuffle that 
appends then strips a random key (e.g. random.getrandbits(32))
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269687#comment-16269687
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653668
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Just use AfterCount(1).
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269679#comment-16269679
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653661
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > A DoFn with nothing but a process method can be more simply implemented 
via beam.Map or beam.FlatMap.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269685#comment-16269685
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653669
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
+  def process(self, element):
+return [(element[0], value) for value in element[1]]
+
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+# TODO: is it safe to reapply this value?
+windowing_saved = pcoll.windowing
+# TODO: add .with_input_types, .with_output_types to PTransforms below?
 
 Review comment:
   
   
   > **robertwb** wrote:
   > It should be able to infer.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269689#comment-16269689
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653665
 
 

 ##
 File path: sdks/python/apache_beam/pvalue.py
 ##
 @@ -124,6 +124,11 @@ def windowing(self):
   self.producer.inputs)
 return self._windowing
 
+  # TODO(ehudm): Make this internal.
+  @windowing.setter
 
 Review comment:
   
   
   > **robertwb** wrote:
   > As discussed, remove this setter.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269682#comment-16269682
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653664
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Rename this window_coder?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269681#comment-16269681
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653662
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Make this private (with an underscore)?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269684#comment-16269684
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653671
 
 

 ##
 File path: sdks/python/apache_beam/transforms/util.py
 ##
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
   self._batch_size_estimator))
 else:
   return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+"""Create a new WindowFn with compatible coder.
+To be applied to PCollections with windows that are compatible with the
+given coder.
+
+Arguments:
+  coder: coders.Coder object to be used on windows.
+"""
+super(IdentityWindowFn, self).__init__()
+if coder is None:
+  raise ValueError('coder should not be None')
+self._coder = coder
+
+  def assign(self, assign_context):
+if assign_context.window is None:
+  raise ValueError(
+  'assign_context.window should not be None. '
+  'This might be due to a DoFn returning a TimestampedValue.')
+return [assign_context.window]
+
+  def get_window_coder(self):
+return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+# doesn't merge
+pass
+
+  def should_fire(self, watermark, window, context):
+return True
+
+  def on_fire(self, watermark, window, context):
+return True
+
+  def reset(self, window, context):
+pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+# TODO: add TriggerForEveryElement to proto
+return beam_runner_api_pb2.Trigger(
+element_count=beam_runner_api_pb2.Trigger.ElementCount(
+element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+class ExpandIterableDoFn(DoFn):
+  def process(self, element):
+return [(element[0], value) for value in element[1]]
+
+class ReifyTimestampsIn(DoFn):
+  def process(self, element, timestamp=DoFn.TimestampParam):
+if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+timestamp == DoFn.TimestampParam):
+  raise ValueError('timestamp was unset for element: %r' % element)
+yield element[0], TimestampedValue(element[1], timestamp)
+
+class ReifyTimestampsExtract(DoFn):
+  def process(self, element, window=DoFn.WindowParam):
+yield windowed_value.WindowedValue(
+(element[0], element[1].value), element[1].timestamp, [window])
+
+# TODO: is it safe to reapply this value?
+windowing_saved = pcoll.windowing
+# TODO: add .with_input_types, .with_output_types to PTransforms below?
+pcoll_intermediate = (pcoll
+| 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
+| 'IdentityWindow' >> WindowInto(
+IdentityWindowFn(windowing_saved.windowfn.get_window_coder()),
+trigger=TriggerForEveryElement(),
+accumulation_mode=AccumulationMode.DISCARDING,
+# TODO: timestamp_combiner=
+)
+| 'GroupByKey' >> GroupByKey()
+| 'ExpandIterable' >> ParDo(ExpandIterableDoFn()))
+pcoll_intermediate.windowing = windowing_saved
 
 Review comment:
   
   
   > **robertwb** wrote:
   > Nit: I'd probably apply assign windowing as the very last thing rather 
than on this intermediate.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> ---

[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269686#comment-16269686
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653667
 
 

 ##
 File path: sdks/python/apache_beam/testing/util.py
 ##
 @@ -80,9 +125,11 @@ def assert_that(actual, matcher, label='assert_that'):
 
   Args:
 actual: A PCollection.
-matcher: A matcher function taking as argument the actual value of a
-  materialized PCollection. The matcher validates this actual value against
-  expectations and raises BeamAssertException if they are not met.
+matcher: Two options:
+  1. A matcher function taking as argument the actual value of a
+  materialized PCollection. The matcher validates this actual value
+  against expectations and raises BeamAssertException if they are not met.
+  2. An instance of WindowedValueMatcher.
 
 Review comment:
   
   
   > **udim** wrote:
   > I thought of doing that, but how does assert_that know if matcher accepts 
values or TestWindowedValues?
   
   
   Acknowledged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219846#comment-16219846
 ] 

ASF GitHub Bot commented on BEAM-1872:
--

GitHub user udim opened a pull request:

https://github.com/apache/beam/pull/4040

[BEAM-1872] Add IdentityWindowFn for use in Reshuffle

BEAM-1872 IdentityWindowFn is intended for internal use in a future
implementation of Reshuffle.

Add and pass current window to WindowFn.AssignContext, for
IdentityWindowFn implementation.

Add WindowedValueMatcher in testing/util.py, for use in IdentityWindowFn
unit tests.



Hi @robertwb. Can you please take a look?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/udim/beam reshuffle_py

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/4040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4040


commit cb5bd6dbba492221e74889c70a503c2544a828c5
Author: Udi Meiri 
Date:   2017-10-25T22:45:39Z

Add IdentityWindowFn.

BEAM-1872 IdentityWindowFn is intended for internal use in a future
implementation of Reshuffle.

Add and pass current window to WindowFn.AssignContext, for
IdentityWindowFn implementation.

Add WindowedValueMatcher in testing/util.py, for use in IdentityWindowFn
unit tests.




> implement Reshuffle transform in python, make it experimental in Java
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core, sdk-py-core
>Reporter: Ahmet Altay
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform

2017-10-04 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191854#comment-16191854
 ] 

Ahmet Altay commented on BEAM-1872:
---

cc: [~kirpichov]

Reshuffle is Java is still being maintained and getting new features (e.g. 
{{ViaRandomKey}}) 
(https://github.com/apache/beam/blob/9379ca289a00cae169075728e6230a1d4a317659/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java)

It makes sense to make {{Reshuffle}} experimental in both SDKs and implement it 
for python as well.

> implement Reshuffle transform
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform

2017-08-09 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120826#comment-16120826
 ] 

Chamikara Jayalath commented on BEAM-1872:
--

Due to the complexities mentioned in following links I don't think this should 
be a newbie/starter issue. So removed those tags.

https://github.com/apache/beam/pull/1036
https://lists.apache.org/thread.html/ac34c9ac665a8d9f67b0254015e44c59ea65ecc1360d4014b95d3b2e@%3Cdev.beam.apache.org%3E

> implement Reshuffle transform
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py
>Reporter: Ahmet Altay
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform

2017-08-09 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120824#comment-16120824
 ] 

Chamikara Jayalath commented on BEAM-1872:
--

I think it's good to keep the JIRA around since currently this is a feature 
available for Java SDK that is not available for Python SDK. I don't think this 
will be removed from Java without replacement and at that point we should add a 
Python SDK version as well. I think we should keep this JIRA around till 
Reshuffle is actually removed from Java SDK.

> implement Reshuffle transform
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py
>Reporter: Ahmet Altay
>  Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform

2017-05-17 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014360#comment-16014360
 ] 

Ahmet Altay commented on BEAM-1872:
---

Updated link: 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java

Reshuffle in Java is internal and deprecated, perhaps we can drop this issue.

> implement Reshuffle transform
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py
>Reporter: Ahmet Altay
>  Labels: newbie, sdk-consistency, starter
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1872) implement Reshuffle transform

2017-04-03 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954454#comment-15954454
 ] 

Ahmet Altay commented on BEAM-1872:
---

Similar to the 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java

> implement Reshuffle transform
> -
>
> Key: BEAM-1872
> URL: https://issues.apache.org/jira/browse/BEAM-1872
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py
>Reporter: Ahmet Altay
>  Labels: newbie, sdk-consistency, starter
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)