I've noticed this doesn't seem to work either.  The workaround is to just
schedule an event-time timer at the end of the window + allowed lateness.
The built-in GroupIntoBatches transform [1] does just this, I suspect to
work around the issue as well.

[1]
https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167

On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere <
[email protected]> wrote:

> Hello dear Beam community,
> Sorry, I sent this email on dev list first but it’s a user support
> question...
> I would like to write to you for a question about OnWindowExpiration
> annotation on DoFn.
> Does anyone of you have a working snippet with this ?
>
> I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery
> call for a historical metric value updated by an external process. I want
> to execute this query and sum the results with my events buffered in a
> state. The OnWindowExpiration looks very practical to accomplish this.
>
> It looks like the function annotated with @OnWindowExpiration is never
> call. My pipeline runs on Dataflow, perhaps its not a supported feature on
> this runner…
>
> Here is a snippet of what I try to accomplish. It seems like the annotated
> functions is never called, the log line is never appearing. Am I missing
> something ?
> I tried to replicate the logic found in this blog post
> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and
> pieces of information found in this PR.
> <https://github.com/apache/beam/pull/4482>
>
>
> // The window definition used in the pipeline sets in a higher transform
> // Window<KV<String, Long>> w =
> // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
> // .withAllowedLateness(Duration.ZERO)
> // .discardingFiredPanes();
>
> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>>
> {
>
> @StateId("buffer")
> private final StateSpec<BagState<KV<String, Long>>> bufferedEvents =
> StateSpecs.bag();
>
> @ProcessElement
> public void process(
> final ProcessContext context,
> final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
> bufferState.add(context.element());
> context.output(context.element());
> }
>
> @OnWindowExpiration
> public void onWindowExpiration(
> final @StateId("buffer") BagState<KV<String, Long>> bufferState,
> final OutputReceiver<KV<String, Long>> outputReceiver) {
> LOG <http://LOG.info>. <http://LOG.info>info <http://LOG.info>("The
> window expired");
> for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.read()))
> {
> outputReceiver.output(enrichedEvent);
> }
> }
> }
>
>
> Thanks for your help,
>
>
> Augustin
>
> Chauffeur Privé devient kapten_ Plus d'informations ici
> <https://www.kapten.com/fr/manifesto.html>
>

Reply via email to