Many thanks for your answers !
The GroupIntoBatches transforms nearly implements the logic I am after, but I 
just want to execute the RPC call at the end of the window, not the flush on 
batch size limit reach.

In order to do so I reimplemented the logic of the GroupIntoBatches that 
guarantees batch flush on window end.

According to my logs its looks like the @OnTimer callback is fired for every 
element that reaches the processElement. Is it the expected behaviors ? I look 
after executing the callback only once (when the window is closed).

Thanks for you help ! Please find below the snippet I am currently running.

Augustin

public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {

  private static final Logger LOG = LoggerFactory.getLogger(TestPipeline.class);

  @TimerId("endOfWindow")
  private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("batch")
  private final StateSpec<BagState<KV<String, Long>>> batchSpec = 
StateSpecs.bag();

  @ProcessElement
  public void processElement(
      final @TimerId("endOfWindow") Timer windowTimer,
      final @StateId("batch") BagState<KV<String, Long>> batch,
      final @Element KV<String, Long> element,
      final BoundedWindow window,
      final OutputReceiver<KV<String, Long>> receiver) {

    Instant windowExpires = window.maxTimestamp();

    LOG.info(
        "*** SET TIMER *** to point in time {} for window {}",
        windowExpires.toString(),
        window.toString());
    windowTimer.set(windowExpires);
    batch.add(element);
    LOG.info("*** BATCH *** Add element for window {} ", window.toString());
  }

  @OnTimer("endOfWindow")
  public void onTimerCallback(
      final OutputReceiver<KV<String, Long>> receiver,
      final @Timestamp Instant timestamp,
      final @StateId("batch") BagState<KV<String, Long>> batch,
      final BoundedWindow window) {
    LOG.info(
        "*** END OF WINDOW *** for timer timestamp {} in windows {}", 
timestamp, window.toString());
    flushBatch(receiver, batch);
  }

  private void flushBatch(
      final OutputReceiver<KV<String, Long>> receiver, final 
BagState<KV<String, Long>> batch) {
    Iterable<KV<String, Long>> values = batch.read();
    // when the timer fires, batch state might be empty
    if (!Iterables.isEmpty(values)) {
      for (KV<String, Long> elem : values) {
        receiver.output(elem);
      }
    }
    batch.clear();
    LOG.info("*** BATCH *** clear");
  }
}



> Le 26 févr. 2019 à 00:49, Kenneth Knowles <[email protected]> a écrit :
> 
> Sorry you hit this issue.
> 
> Implementation of this feature has been marked in progress [1] for a while. 
> It looks to be stalled so I unassigned the ticket. There is not any explicit 
> runner support, yet, though the existing implementation is clever enough that 
> it may automatically work for many runners.
> 
> Kenn
> 
> [1] https://issues.apache.org/jira/browse/BEAM-1589 
> <https://issues.apache.org/jira/browse/BEAM-1589>
> On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <[email protected] 
> <mailto:[email protected]>> wrote:
> I've noticed this doesn't seem to work either.  The workaround is to just 
> schedule an event-time timer at the end of the window + allowed lateness.  
> The built-in GroupIntoBatches transform [1] does just this, I suspect to work 
> around the issue as well.
> 
> [1] 
> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167
>  
> <https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167>
> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere 
> <[email protected] <mailto:[email protected]>> 
> wrote:
> Hello dear Beam community,
> Sorry, I sent this email on dev list first but it’s a user support question...
> I would like to write to you for a question about OnWindowExpiration 
> annotation on DoFn.
> Does anyone of you have a working snippet with this ?
> 
> I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery 
> call for a historical metric value updated by an external process. I want to 
> execute this query and sum the results with my events buffered in a state. 
> The OnWindowExpiration looks very practical to accomplish this.
> 
> It looks like the function annotated with @OnWindowExpiration is never call. 
> My pipeline runs on Dataflow, perhaps its not a supported feature on this 
> runner…
> 
> Here is a snippet of what I try to accomplish. It seems like the annotated 
> functions is never called, the log line is never appearing. Am I missing 
> something ?
> I tried to replicate the logic found in this blog post 
> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and pieces 
> of information found in this PR. <https://github.com/apache/beam/pull/4482>
> 
> 
> // The window definition used in the pipeline sets in a higher transform
> // Window<KV<String, Long>> w =
> //     Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
> //         .withAllowedLateness(Duration.ZERO)
> //         .discardingFiredPanes();
> 
> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {
> 
>   @StateId("buffer")
>   private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = 
> StateSpecs.bag();
> 
>   @ProcessElement
>   public void process(
>       final ProcessContext context,
>       final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
>     bufferState.add(context.element());
>     context.output(context.element());
>   }
> 
>   @OnWindowExpiration
>   public void onWindowExpiration(
>       final @StateId("buffer") BagState<KV<String, Long>> bufferState,
>       final OutputReceiver<KV<String, Long>> outputReceiver) {
>     LOG <http://log.info/>. <http://log.info/>info <http://log.info/>("The 
> window expired");
>     for (KV<String, Long> enrichedEvent : 
> enrichWithBigQuery(bufferState.read())) {
>       outputReceiver.output(enrichedEvent);
>     }
>   }
> }
> 
> 
> Thanks for your help,
> 
> 
> Augustin
> 
> Chauffeur Privé devient kapten_ Plus d'informations ici 
> <https://www.kapten.com/fr/manifesto.html>


-- 
Chauffeur Privé devient kapten_ Plus d'informations ici 
<https://www.kapten.com/fr/manifesto.html>

Reply via email to