Many thanks for your answers !
The GroupIntoBatches transforms nearly implements the logic I am after, but I
just want to execute the RPC call at the end of the window, not the flush on
batch size limit reach.
In order to do so I reimplemented the logic of the GroupIntoBatches that
guarantees batch flush on window end.
According to my logs its looks like the @OnTimer callback is fired for every
element that reaches the processElement. Is it the expected behaviors ? I look
after executing the callback only once (when the window is closed).
Thanks for you help ! Please find below the snippet I am currently running.
Augustin
public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {
private static final Logger LOG = LoggerFactory.getLogger(TestPipeline.class);
@TimerId("endOfWindow")
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("batch")
private final StateSpec<BagState<KV<String, Long>>> batchSpec =
StateSpecs.bag();
@ProcessElement
public void processElement(
final @TimerId("endOfWindow") Timer windowTimer,
final @StateId("batch") BagState<KV<String, Long>> batch,
final @Element KV<String, Long> element,
final BoundedWindow window,
final OutputReceiver<KV<String, Long>> receiver) {
Instant windowExpires = window.maxTimestamp();
LOG.info(
"*** SET TIMER *** to point in time {} for window {}",
windowExpires.toString(),
window.toString());
windowTimer.set(windowExpires);
batch.add(element);
LOG.info("*** BATCH *** Add element for window {} ", window.toString());
}
@OnTimer("endOfWindow")
public void onTimerCallback(
final OutputReceiver<KV<String, Long>> receiver,
final @Timestamp Instant timestamp,
final @StateId("batch") BagState<KV<String, Long>> batch,
final BoundedWindow window) {
LOG.info(
"*** END OF WINDOW *** for timer timestamp {} in windows {}",
timestamp, window.toString());
flushBatch(receiver, batch);
}
private void flushBatch(
final OutputReceiver<KV<String, Long>> receiver, final
BagState<KV<String, Long>> batch) {
Iterable<KV<String, Long>> values = batch.read();
// when the timer fires, batch state might be empty
if (!Iterables.isEmpty(values)) {
for (KV<String, Long> elem : values) {
receiver.output(elem);
}
}
batch.clear();
LOG.info("*** BATCH *** clear");
}
}
> Le 26 févr. 2019 à 00:49, Kenneth Knowles <[email protected]> a écrit :
>
> Sorry you hit this issue.
>
> Implementation of this feature has been marked in progress [1] for a while.
> It looks to be stalled so I unassigned the ticket. There is not any explicit
> runner support, yet, though the existing implementation is clever enough that
> it may automatically work for many runners.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-1589
> <https://issues.apache.org/jira/browse/BEAM-1589>
> On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <[email protected]
> <mailto:[email protected]>> wrote:
> I've noticed this doesn't seem to work either. The workaround is to just
> schedule an event-time timer at the end of the window + allowed lateness.
> The built-in GroupIntoBatches transform [1] does just this, I suspect to work
> around the issue as well.
>
> [1]
> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167
>
> <https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167>
> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere
> <[email protected] <mailto:[email protected]>>
> wrote:
> Hello dear Beam community,
> Sorry, I sent this email on dev list first but it’s a user support question...
> I would like to write to you for a question about OnWindowExpiration
> annotation on DoFn.
> Does anyone of you have a working snippet with this ?
>
> I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery
> call for a historical metric value updated by an external process. I want to
> execute this query and sum the results with my events buffered in a state.
> The OnWindowExpiration looks very practical to accomplish this.
>
> It looks like the function annotated with @OnWindowExpiration is never call.
> My pipeline runs on Dataflow, perhaps its not a supported feature on this
> runner…
>
> Here is a snippet of what I try to accomplish. It seems like the annotated
> functions is never called, the log line is never appearing. Am I missing
> something ?
> I tried to replicate the logic found in this blog post
> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and pieces
> of information found in this PR. <https://github.com/apache/beam/pull/4482>
>
>
> // The window definition used in the pipeline sets in a higher transform
> // Window<KV<String, Long>> w =
> // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
> // .withAllowedLateness(Duration.ZERO)
> // .discardingFiredPanes();
>
> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {
>
> @StateId("buffer")
> private final StateSpec<BagState<KV<String, Long>>> bufferedEvents =
> StateSpecs.bag();
>
> @ProcessElement
> public void process(
> final ProcessContext context,
> final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
> bufferState.add(context.element());
> context.output(context.element());
> }
>
> @OnWindowExpiration
> public void onWindowExpiration(
> final @StateId("buffer") BagState<KV<String, Long>> bufferState,
> final OutputReceiver<KV<String, Long>> outputReceiver) {
> LOG <http://log.info/>. <http://log.info/>info <http://log.info/>("The
> window expired");
> for (KV<String, Long> enrichedEvent :
> enrichWithBigQuery(bufferState.read())) {
> outputReceiver.output(enrichedEvent);
> }
> }
> }
>
>
> Thanks for your help,
>
>
> Augustin
>
> Chauffeur Privé devient kapten_ Plus d'informations ici
> <https://www.kapten.com/fr/manifesto.html>
--
Chauffeur Privé devient kapten_ Plus d'informations ici
<https://www.kapten.com/fr/manifesto.html>