I think I got the issue. I did not get that states are partitioned by key. So 
if I want to limit callback fires I need to change my partitioning logic for a 
more coarse grained one. Please tell me if I’m wrong or if a special feature 
exists to access a the global state, put due to shuffling issue I think it my 
not be possible / wanted. 

Thanks !

Augustin



> Le 26 févr. 2019 à 14:06, Augustin Lafanechere 
> <[email protected]> a écrit :
> 
> Many thanks for your answers !
> The GroupIntoBatches transforms nearly implements the logic I am after, but I 
> just want to execute the RPC call at the end of the window, not the flush on 
> batch size limit reach.
> 
> In order to do so I reimplemented the logic of the GroupIntoBatches that 
> guarantees batch flush on window end.
> 
> According to my logs its looks like the @OnTimer callback is fired for every 
> element that reaches the processElement. Is it the expected behaviors ? I 
> look after executing the callback only once (when the window is closed).
> 
> Thanks for you help ! Please find below the snippet I am currently running.
> 
> Augustin
> 
> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {
> 
>   private static final Logger LOG = 
> LoggerFactory.getLogger(TestPipeline.class);
> 
>   @TimerId("endOfWindow")
>   private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
> 
>   @StateId("batch")
>   private final StateSpec<BagState<KV<String, Long>>> batchSpec = 
> StateSpecs.bag();
> 
>   @ProcessElement
>   public void processElement(
>       final @TimerId("endOfWindow") Timer windowTimer,
>       final @StateId("batch") BagState<KV<String, Long>> batch,
>       final @Element KV<String, Long> element,
>       final BoundedWindow window,
>       final OutputReceiver<KV<String, Long>> receiver) {
> 
>     Instant windowExpires = window.maxTimestamp();
> 
>     LOG.info(
>         "*** SET TIMER *** to point in time {} for window {}",
>         windowExpires.toString(),
>         window.toString());
>     windowTimer.set(windowExpires);
>     batch.add(element);
>     LOG.info("*** BATCH *** Add element for window {} ", window.toString());
>   }
> 
>   @OnTimer("endOfWindow")
>   public void onTimerCallback(
>       final OutputReceiver<KV<String, Long>> receiver,
>       final @Timestamp Instant timestamp,
>       final @StateId("batch") BagState<KV<String, Long>> batch,
>       final BoundedWindow window) {
>     LOG.info(
>         "*** END OF WINDOW *** for timer timestamp {} in windows {}", 
> timestamp, window.toString());
>     flushBatch(receiver, batch);
>   }
> 
>   private void flushBatch(
>       final OutputReceiver<KV<String, Long>> receiver, final 
> BagState<KV<String, Long>> batch) {
>     Iterable<KV<String, Long>> values = batch.read();
>     // when the timer fires, batch state might be empty
>     if (!Iterables.isEmpty(values)) {
>       for (KV<String, Long> elem : values) {
>         receiver.output(elem);
>       }
>     }
>     batch.clear();
>     LOG.info("*** BATCH *** clear");
>   }
> }
> 
> 
> 
>> Le 26 févr. 2019 à 00:49, Kenneth Knowles <[email protected] 
>> <mailto:[email protected]>> a écrit :
>> 
>> Sorry you hit this issue.
>> 
>> Implementation of this feature has been marked in progress [1] for a while. 
>> It looks to be stalled so I unassigned the ticket. There is not any explicit 
>> runner support, yet, though the existing implementation is clever enough 
>> that it may automatically work for many runners.
>> 
>> Kenn
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-1589 
>> <https://issues.apache.org/jira/browse/BEAM-1589>
>> On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I've noticed this doesn't seem to work either.  The workaround is to just 
>> schedule an event-time timer at the end of the window + allowed lateness.  
>> The built-in GroupIntoBatches transform [1] does just this, I suspect to 
>> work around the issue as well.
>> 
>> [1] 
>> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167
>>  
>> <https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167>
>> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere 
>> <[email protected] <mailto:[email protected]>> 
>> wrote:
>> Hello dear Beam community,
>> Sorry, I sent this email on dev list first but it’s a user support 
>> question...
>> I would like to write to you for a question about OnWindowExpiration 
>> annotation on DoFn.
>> Does anyone of you have a working snippet with this ?
>> 
>> I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery 
>> call for a historical metric value updated by an external process. I want to 
>> execute this query and sum the results with my events buffered in a state. 
>> The OnWindowExpiration looks very practical to accomplish this.
>> 
>> It looks like the function annotated with @OnWindowExpiration is never call. 
>> My pipeline runs on Dataflow, perhaps its not a supported feature on this 
>> runner…
>> 
>> Here is a snippet of what I try to accomplish. It seems like the annotated 
>> functions is never called, the log line is never appearing. Am I missing 
>> something ?
>> I tried to replicate the logic found in this blog post 
>> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and pieces 
>> of information found in this PR. <https://github.com/apache/beam/pull/4482>
>> 
>> 
>> // The window definition used in the pipeline sets in a higher transform
>> // Window<KV<String, Long>> w =
>> //     Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
>> //         .withAllowedLateness(Duration.ZERO)
>> //         .discardingFiredPanes();
>> 
>> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {
>> 
>>   @StateId("buffer")
>>   private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = 
>> StateSpecs.bag();
>> 
>>   @ProcessElement
>>   public void process(
>>       final ProcessContext context,
>>       final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
>>     bufferState.add(context.element());
>>     context.output(context.element());
>>   }
>> 
>>   @OnWindowExpiration
>>   public void onWindowExpiration(
>>       final @StateId("buffer") BagState<KV<String, Long>> bufferState,
>>       final OutputReceiver<KV<String, Long>> outputReceiver) {
>>     LOG <http://log.info/>. <http://log.info/>info <http://log.info/>("The 
>> window expired");
>>     for (KV<String, Long> enrichedEvent : 
>> enrichWithBigQuery(bufferState.read())) {
>>       outputReceiver.output(enrichedEvent);
>>     }
>>   }
>> }
>> 
>> 
>> Thanks for your help,
>> 
>> 
>> Augustin
>> 
>> Chauffeur Privé devient kapten_ Plus d'informations ici 
>> <https://www.kapten.com/fr/manifesto.html>
> 


-- 
Chauffeur Privé devient kapten_ Plus d'informations ici 
<https://www.kapten.com/fr/manifesto.html>

Reply via email to