Hi,
I am trying to set a timer at window expiration time for my use case and
expect it to fire just once per key per window.
But instead I observe that the onTimer() method gets called multiple times
almost all the time.
Here's the relevant code snippet:
@TimerId(WIN_EXP)
private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId(COUNTS)
private final StateSpec<ValueState<Map<String, Integer>>> counts =
StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId(COUNTS) ValueState<Map<String, Integer>>
countsState,
@TimerId(WIN_EXP) Timer winExpTimer) {
...
Map<String, Integer> counts = countsState.read();
if (counts == null) {
counts = new HashMap<>();
// Only place where I set the timer
winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
}
... // no return here and I do not observe exception in the pipeline
countsState.write(counts);
...
}
I tried adding logs in OnTimer:
String key = keyState.read();
if (key != null && key.equals("xxx")) {
logger.error(String.format("fired for %s.",
context.window().maxTimestamp().toDateTime()));
}
Output:
E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
Seems like this is not due to some contention, the first log and the last
is ~1minute apart. BTW, my allowed
lateness is also set to 1 minute.
Anyone can let me know if I am missing something here? I am using beam 2.22
and dataflow runner.
Thanks!