Hi,

I am trying to set a timer at window expiration time for my use case and
expect it to fire just once per key per window.
But instead I observe that the onTimer() method gets called multiple times
almost all the time.

Here's the relevant code snippet:

@TimerId(WIN_EXP)
private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@StateId(COUNTS)
private final StateSpec<ValueState<Map<String, Integer>>> counts =
StateSpecs.value();

@ProcessElement
public void process(
                    ProcessContext context,
                    @StateId(COUNTS) ValueState<Map<String, Integer>>
countsState,
                    @TimerId(WIN_EXP) Timer winExpTimer) {

  ...
  Map<String, Integer> counts = countsState.read();
  if (counts == null) {
    counts = new HashMap<>();
    // Only place where I set the timer

winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
  }
  ... // no return here and I do not observe exception in the pipeline
  countsState.write(counts);
  ...
}

I tried adding logs in OnTimer:

String key = keyState.read();
if (key != null && key.equals("xxx")) {
  logger.error(String.format("fired for %s.",
context.window().maxTimestamp().toDateTime()));
}

Output:

E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.

Seems like this is not due to some contention, the first log and the last
is ~1minute apart. BTW, my allowed
lateness is also set to 1 minute.

Anyone can let me know if I am missing something here? I am using beam 2.22
and dataflow runner.

Thanks!

Reply via email to