I've noticed this doesn't seem to work either. The workaround is to just schedule an event-time timer at the end of the window + allowed lateness. The built-in GroupIntoBatches transform [1] does just this, I suspect to work around the issue as well.
[1] https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167 On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere < [email protected]> wrote: > Hello dear Beam community, > Sorry, I sent this email on dev list first but it’s a user support > question... > I would like to write to you for a question about OnWindowExpiration > annotation on DoFn. > Does anyone of you have a working snippet with this ? > > I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery > call for a historical metric value updated by an external process. I want > to execute this query and sum the results with my events buffered in a > state. The OnWindowExpiration looks very practical to accomplish this. > > It looks like the function annotated with @OnWindowExpiration is never > call. My pipeline runs on Dataflow, perhaps its not a supported feature on > this runner… > > Here is a snippet of what I try to accomplish. It seems like the annotated > functions is never called, the log line is never appearing. Am I missing > something ? > I tried to replicate the logic found in this blog post > <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and > pieces of information found in this PR. > <https://github.com/apache/beam/pull/4482> > > > // The window definition used in the pipeline sets in a higher transform > // Window<KV<String, Long>> w = > // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L))) > // .withAllowedLateness(Duration.ZERO) > // .discardingFiredPanes(); > > public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> > { > > @StateId("buffer") > private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = > StateSpecs.bag(); > > @ProcessElement > public void process( > final ProcessContext context, > final @StateId("buffer") BagState<KV<String, Long>> bufferState) { > bufferState.add(context.element()); > context.output(context.element()); > } > > @OnWindowExpiration > public void onWindowExpiration( > final @StateId("buffer") BagState<KV<String, Long>> bufferState, > final OutputReceiver<KV<String, Long>> outputReceiver) { > LOG <http://LOG.info>. <http://LOG.info>info <http://LOG.info>("The > window expired"); > for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.read())) > { > outputReceiver.output(enrichedEvent); > } > } > } > > > Thanks for your help, > > > Augustin > > Chauffeur Privé devient kapten_ Plus d'informations ici > <https://www.kapten.com/fr/manifesto.html> >
