I think I got the issue. I did not get that states are partitioned by key. So if I want to limit callback fires I need to change my partitioning logic for a more coarse grained one. Please tell me if I’m wrong or if a special feature exists to access a the global state, put due to shuffling issue I think it my not be possible / wanted.
Thanks ! Augustin > Le 26 févr. 2019 à 14:06, Augustin Lafanechere > <[email protected]> a écrit : > > Many thanks for your answers ! > The GroupIntoBatches transforms nearly implements the logic I am after, but I > just want to execute the RPC call at the end of the window, not the flush on > batch size limit reach. > > In order to do so I reimplemented the logic of the GroupIntoBatches that > guarantees batch flush on window end. > > According to my logs its looks like the @OnTimer callback is fired for every > element that reaches the processElement. Is it the expected behaviors ? I > look after executing the callback only once (when the window is closed). > > Thanks for you help ! Please find below the snippet I am currently running. > > Augustin > > public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> { > > private static final Logger LOG = > LoggerFactory.getLogger(TestPipeline.class); > > @TimerId("endOfWindow") > private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); > > @StateId("batch") > private final StateSpec<BagState<KV<String, Long>>> batchSpec = > StateSpecs.bag(); > > @ProcessElement > public void processElement( > final @TimerId("endOfWindow") Timer windowTimer, > final @StateId("batch") BagState<KV<String, Long>> batch, > final @Element KV<String, Long> element, > final BoundedWindow window, > final OutputReceiver<KV<String, Long>> receiver) { > > Instant windowExpires = window.maxTimestamp(); > > LOG.info( > "*** SET TIMER *** to point in time {} for window {}", > windowExpires.toString(), > window.toString()); > windowTimer.set(windowExpires); > batch.add(element); > LOG.info("*** BATCH *** Add element for window {} ", window.toString()); > } > > @OnTimer("endOfWindow") > public void onTimerCallback( > final OutputReceiver<KV<String, Long>> receiver, > final @Timestamp Instant timestamp, > final @StateId("batch") BagState<KV<String, Long>> batch, > final BoundedWindow window) { > LOG.info( > "*** END OF WINDOW *** for timer timestamp {} in windows {}", > timestamp, window.toString()); > flushBatch(receiver, batch); > } > > private void flushBatch( > final OutputReceiver<KV<String, Long>> receiver, final > BagState<KV<String, Long>> batch) { > Iterable<KV<String, Long>> values = batch.read(); > // when the timer fires, batch state might be empty > if (!Iterables.isEmpty(values)) { > for (KV<String, Long> elem : values) { > receiver.output(elem); > } > } > batch.clear(); > LOG.info("*** BATCH *** clear"); > } > } > > > >> Le 26 févr. 2019 à 00:49, Kenneth Knowles <[email protected] >> <mailto:[email protected]>> a écrit : >> >> Sorry you hit this issue. >> >> Implementation of this feature has been marked in progress [1] for a while. >> It looks to be stalled so I unassigned the ticket. There is not any explicit >> runner support, yet, though the existing implementation is clever enough >> that it may automatically work for many runners. >> >> Kenn >> >> [1] https://issues.apache.org/jira/browse/BEAM-1589 >> <https://issues.apache.org/jira/browse/BEAM-1589> >> On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <[email protected] >> <mailto:[email protected]>> wrote: >> I've noticed this doesn't seem to work either. The workaround is to just >> schedule an event-time timer at the end of the window + allowed lateness. >> The built-in GroupIntoBatches transform [1] does just this, I suspect to >> work around the issue as well. >> >> [1] >> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167 >> >> <https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167> >> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere >> <[email protected] <mailto:[email protected]>> >> wrote: >> Hello dear Beam community, >> Sorry, I sent this email on dev list first but it’s a user support >> question... >> I would like to write to you for a question about OnWindowExpiration >> annotation on DoFn. >> Does anyone of you have a working snippet with this ? >> >> I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery >> call for a historical metric value updated by an external process. I want to >> execute this query and sum the results with my events buffered in a state. >> The OnWindowExpiration looks very practical to accomplish this. >> >> It looks like the function annotated with @OnWindowExpiration is never call. >> My pipeline runs on Dataflow, perhaps its not a supported feature on this >> runner… >> >> Here is a snippet of what I try to accomplish. It seems like the annotated >> functions is never called, the log line is never appearing. Am I missing >> something ? >> I tried to replicate the logic found in this blog post >> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and pieces >> of information found in this PR. <https://github.com/apache/beam/pull/4482> >> >> >> // The window definition used in the pipeline sets in a higher transform >> // Window<KV<String, Long>> w = >> // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L))) >> // .withAllowedLateness(Duration.ZERO) >> // .discardingFiredPanes(); >> >> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> { >> >> @StateId("buffer") >> private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = >> StateSpecs.bag(); >> >> @ProcessElement >> public void process( >> final ProcessContext context, >> final @StateId("buffer") BagState<KV<String, Long>> bufferState) { >> bufferState.add(context.element()); >> context.output(context.element()); >> } >> >> @OnWindowExpiration >> public void onWindowExpiration( >> final @StateId("buffer") BagState<KV<String, Long>> bufferState, >> final OutputReceiver<KV<String, Long>> outputReceiver) { >> LOG <http://log.info/>. <http://log.info/>info <http://log.info/>("The >> window expired"); >> for (KV<String, Long> enrichedEvent : >> enrichWithBigQuery(bufferState.read())) { >> outputReceiver.output(enrichedEvent); >> } >> } >> } >> >> >> Thanks for your help, >> >> >> Augustin >> >> Chauffeur Privé devient kapten_ Plus d'informations ici >> <https://www.kapten.com/fr/manifesto.html> > -- Chauffeur Privé devient kapten_ Plus d'informations ici <https://www.kapten.com/fr/manifesto.html>
