Hi all,

I am trying to figure out how I can use the State api and Timer api to
build a DoFn that can build session by buffering some of the elements till
all required information is available or the session window gap duration
expires.


Currently, I have the following defined in my DoFn<KV<String, VideoEvent>>

@StateId("pendingVideoEvents")
private final StateSpec<BagState<VideoEvent>>
_PENDING_VIDEO_EVENTS_STATE_SPEC =
StateSpecs.bag(ProtoCoder.of(VideoEvent.class));

@TimerId("finallyCleanup")
private final TimerSpec _FINALLY_TIMER_SPEC =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

@OnTimer("finallyCleanup")
public void onFinallyCleanup(OnTimerContext c,
@StateId("pendingVideoEvents") BagState<VideoEvent>
pendingVideoEventsState) {
    Iterable<VideoEvent> events = pendingVideoEventsState.read();
    if (events == null) {
        return;
    }
    for (VideoEvent event : events) {
        String key = event.getSession() + "/" + event.getVideo();
        LOG.debug("Emitting {} -> {}", key, event);
        c.output(KV.of(key, event));
    }
}

The processElement method sets the timer when stuff is added to the
pendingVideoState state object as follows:

// set the timer to some point
finallyTimer.offset(gapDuration).align(Duration.millis(1)).setRelative();


Here I am not too sure if I used the timer api correctly. I see lots of
errors:

"java.lang.IllegalStateException: TimestampCombiner moved element from
2017-06-14T04:27:05.000Z to earlier time 2017-06-14T04:17:13.732Z for
window [2017-06-14T04:12:13.733Z..2017-06-14T04:17:13.733Z)
at
org.apache.beam.runners.core.java.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:738)
at org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:206)
at
org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:239)
at
org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:190)
at
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:347)
at
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowViaWindowSetDoFn.processElement(GroupAlsoByWindowViaWindowSetDoFn.java:89)
at
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:122)
at
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:101)
at
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:73)
at
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:106)
at
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
at
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:198)
at
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)
at
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:791)
at
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.access$600(StreamingDataflowWorker.java:104)
at
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker$9.run(StreamingDataflowWorker.java:873)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"
Which makes me think I am getting something wrong with the timers. Can
someone explain the correct way to "flush" state in case of window closure
(with or without timers). Also a general description / example of using the
timer api may be helpful.

-- Ankur Chauhan

Reply via email to