[ 
https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-6177:
--------------------------------
    Labels: stale-P2  (was: )

> AfterProcessingTime not firing
> ------------------------------
>
>                 Key: BEAM-6177
>                 URL: https://issues.apache.org/jira/browse/BEAM-6177
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.8.0
>            Reporter: Arnaud T
>            Priority: P2
>              Labels: stale-P2
>
> Hi,
>  
> Documentation says that a AfterProcessingTime(X) trigger should fire X 
> seconds after the first element is processed, but it appears that this 
> trigger never fires when using a Global window on a steady influx of elements.
> Here is my pipeline:
>  
> {code:java}
> (p
> | 'pubsub' >> 
> beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
> | 'window' >> beam.WindowInto(
>       window.GlobalWindows(),
>       trigger=Repeatedly(AfterProcessingTime(5)),
>       accumulation_mode=AccumulationMode.DISCARDING
>       )
> | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
> | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
> | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
> | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, 
> self._clickhouse_port,self._clickhouse_database)
> )
> {code}
>  
>  
> I expect that every 5 seconds (as long as elements are pouring in), the 
> trigger would fire and my data would be combined. The idea of this pipeline 
> is simply to get messages from PubSub, transform them into ClickHouse ORM 
> models and then batch save them into ClickHouse, using as much parallelism as 
> possible - we do not care about order, etc... Elements can be inserted in any 
> order and are not correlated to one another.
> The potential issue is in _class 
> AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in 
> trigger.py_:
>  
> {code:java}
> context.set_timer(
>     '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
> {code}
> This will basically override the previously set timer every time a new 
> element comes in, and in the case of a constant influx of elements, the 
> trigger only fires once we have no more elements for X seconds.
>  
>  
> Please let me know if I understood the documentation right, and if I can 
> further help.
>  
> Thanks you,
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to