Sorry you hit this issue.

Implementation of this feature has been marked in progress [1] for a while.
It looks to be stalled so I unassigned the ticket. There is not any
explicit runner support, yet, though the existing implementation is clever
enough that it may automatically work for many runners.

Kenn

[1] https://issues.apache.org/jira/browse/BEAM-1589

On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <[email protected]> wrote:

> I've noticed this doesn't seem to work either.  The workaround is to just
> schedule an event-time timer at the end of the window + allowed lateness.
> The built-in GroupIntoBatches transform [1] does just this, I suspect to
> work around the issue as well.
>
> [1]
> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167
>
> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere <
> [email protected]> wrote:
>
>> Hello dear Beam community,
>> Sorry, I sent this email on dev list first but it’s a user support
>> question...
>> I would like to write to you for a question about OnWindowExpiration
>> annotation on DoFn.
>> Does anyone of you have a working snippet with this ?
>>
>> I try to write a DoFn with a Batch RPC on window closure. It is a
>> BigQuery call for a historical metric value updated by an external process.
>> I want to execute this query and sum the results with my events buffered in
>> a state. The OnWindowExpiration looks very practical to accomplish this.
>>
>> It looks like the function annotated with @OnWindowExpiration is never
>> call. My pipeline runs on Dataflow, perhaps its not a supported feature on
>> this runner…
>>
>> Here is a snippet of what I try to accomplish. It seems like the
>> annotated functions is never called, the log line is never appearing. Am I
>> missing something ?
>> I tried to replicate the logic found in this blog post
>> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and
>> pieces of information found in this PR.
>> <https://github.com/apache/beam/pull/4482>
>>
>>
>> // The window definition used in the pipeline sets in a higher transform
>> // Window<KV<String, Long>> w =
>> // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
>> // .withAllowedLateness(Duration.ZERO)
>> // .discardingFiredPanes();
>>
>> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>>
>> {
>>
>> @StateId("buffer")
>> private final StateSpec<BagState<KV<String, Long>>> bufferedEvents =
>> StateSpecs.bag();
>>
>> @ProcessElement
>> public void process(
>> final ProcessContext context,
>> final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
>> bufferState.add(context.element());
>> context.output(context.element());
>> }
>>
>> @OnWindowExpiration
>> public void onWindowExpiration(
>> final @StateId("buffer") BagState<KV<String, Long>> bufferState,
>> final OutputReceiver<KV<String, Long>> outputReceiver) {
>> LOG <http://LOG.info>. <http://LOG.info>info <http://LOG.info>("The
>> window expired");
>> for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.read()))
>> {
>> outputReceiver.output(enrichedEvent);
>> }
>> }
>> }
>>
>>
>> Thanks for your help,
>>
>>
>> Augustin
>>
>> Chauffeur Privé devient kapten_ Plus d'informations ici
>> <https://www.kapten.com/fr/manifesto.html>
>>
>

Reply via email to