Sorry you hit this issue. Implementation of this feature has been marked in progress [1] for a while. It looks to be stalled so I unassigned the ticket. There is not any explicit runner support, yet, though the existing implementation is clever enough that it may automatically work for many runners.
Kenn [1] https://issues.apache.org/jira/browse/BEAM-1589 On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <[email protected]> wrote: > I've noticed this doesn't seem to work either. The workaround is to just > schedule an event-time timer at the end of the window + allowed lateness. > The built-in GroupIntoBatches transform [1] does just this, I suspect to > work around the issue as well. > > [1] > https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167 > > On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere < > [email protected]> wrote: > >> Hello dear Beam community, >> Sorry, I sent this email on dev list first but it’s a user support >> question... >> I would like to write to you for a question about OnWindowExpiration >> annotation on DoFn. >> Does anyone of you have a working snippet with this ? >> >> I try to write a DoFn with a Batch RPC on window closure. It is a >> BigQuery call for a historical metric value updated by an external process. >> I want to execute this query and sum the results with my events buffered in >> a state. The OnWindowExpiration looks very practical to accomplish this. >> >> It looks like the function annotated with @OnWindowExpiration is never >> call. My pipeline runs on Dataflow, perhaps its not a supported feature on >> this runner… >> >> Here is a snippet of what I try to accomplish. It seems like the >> annotated functions is never called, the log line is never appearing. Am I >> missing something ? >> I tried to replicate the logic found in this blog post >> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and >> pieces of information found in this PR. >> <https://github.com/apache/beam/pull/4482> >> >> >> // The window definition used in the pipeline sets in a higher transform >> // Window<KV<String, Long>> w = >> // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L))) >> // .withAllowedLateness(Duration.ZERO) >> // .discardingFiredPanes(); >> >> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> >> { >> >> @StateId("buffer") >> private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = >> StateSpecs.bag(); >> >> @ProcessElement >> public void process( >> final ProcessContext context, >> final @StateId("buffer") BagState<KV<String, Long>> bufferState) { >> bufferState.add(context.element()); >> context.output(context.element()); >> } >> >> @OnWindowExpiration >> public void onWindowExpiration( >> final @StateId("buffer") BagState<KV<String, Long>> bufferState, >> final OutputReceiver<KV<String, Long>> outputReceiver) { >> LOG <http://LOG.info>. <http://LOG.info>info <http://LOG.info>("The >> window expired"); >> for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.read())) >> { >> outputReceiver.output(enrichedEvent); >> } >> } >> } >> >> >> Thanks for your help, >> >> >> Augustin >> >> Chauffeur Privé devient kapten_ Plus d'informations ici >> <https://www.kapten.com/fr/manifesto.html> >> >
