[jira] [Commented] (BEAM-14497) Python Reshuffle holds elements

2022-06-04 Thread Danny McCormick (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550021#comment-17550021
 ] 

Danny McCormick commented on BEAM-14497:


This issue has been migrated to https://github.com/apache/beam/issues/21591

> Python Reshuffle holds elements
> ---
>
> Key: BEAM-14497
> URL: https://issues.apache.org/jira/browse/BEAM-14497
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Yi Hu
>Priority: P2
>
> Python Reshuffle holds elements when pipeline is running, and likely release 
> them in a batch. In contrast, Java Reshuffle triggers on every element as 
> noted in its documentation 
> "the trigger used with {@link Reshuffle} which triggers on every element and 
> never buffers
>  * state."
> Here is a working example:
> {code:python}
> def test(p: Pipeline):
>   class SlowProcessFn(beam.DoFn):
> def process(self, element):
>   time.sleep(0.5)
>   yield element
>   result = (p 
> | beam.Create(range(100)) 
> | beam.ParDo(SlowProcessFn())
> | beam.Reshuffle() # HERE
> | beam.Map(lambda x: print(x, time.time(
>   return result
> {code}
> Tested on local runner and flink runner (1.14), the elements are printed 
> after 50 secs. If commenting out Reshuffle, every half second an element gets 
> printed.
> This behavior introduces issue when downstream PTransform involves some kind 
> of time-sensitive operation, like receiving a list of updated files from 
> input and read them done by filebasedsource.ReadAllFiles transform. Because 
> there is a Reshuffle in ReadAll, the actual read will be blocked.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (BEAM-14497) Python Reshuffle holds elements

2022-05-21 Thread Yi Hu (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17540510#comment-17540510
 ] 

Yi Hu commented on BEAM-14497:
--

>From testing found that the Always() trigger actually does not working. Using 
>a Always() trigger still cause GroupByKey() to group elements. In Java 
>ReShuffle, there is a ReshuffleTrigger, tested that using it with GroupByKey 
>does not group elements.

{code:python}
def test(p: Pipeline):
  result = (p
  | beam.Create([('A', 1), ('B', 1), ('C', 1), ('A', 2)])
  | 'Timestamped' >> beam.Map(lambda x: TimestampedValue(x, 
timestamp=time.time()))
  | WindowInto(GlobalWindows(), trigger=Always(), 
  accumulation_mode=AccumulationMode.DISCARDING, 
  timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST,
  allowed_lateness=1000)
  | GroupByKey()
  | beam.Map(print)
  )
{code}

> Python Reshuffle holds elements
> ---
>
> Key: BEAM-14497
> URL: https://issues.apache.org/jira/browse/BEAM-14497
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Yi Hu
>Priority: P2
>
> Python Reshuffle holds elements when pipeline is running, and likely release 
> them in a batch. In contrast, Java Reshuffle triggers on every element as 
> noted in its documentation 
> "the trigger used with {@link Reshuffle} which triggers on every element and 
> never buffers
>  * state."
> Here is a working example:
> {code:python}
> def test(p: Pipeline):
>   class SlowProcessFn(beam.DoFn):
> def process(self, element):
>   time.sleep(0.5)
>   yield element
>   result = (p 
> | beam.Create(range(100)) 
> | beam.ParDo(SlowProcessFn())
> | beam.Reshuffle() # HERE
> | beam.Map(lambda x: print(x, time.time(
>   return result
> {code}
> Tested on local runner and flink runner (1.14), the elements are printed 
> after 50 secs. If commenting out Reshuffle, every half second an element gets 
> printed.
> This behavior introduces issue when downstream PTransform involves some kind 
> of time-sensitive operation, like receiving a list of updated files from 
> input and read them done by filebasedsource.ReadAllFiles transform. Because 
> there is a Reshuffle in ReadAll, the actual read will be blocked.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)