Re: DoFn Timer fire multiple times

2020-07-16 Thread Reuven Lax
One thing to keep in mind - any bundle can be executed multiple times, and
that includes timer firings. The runners will guarantee that only one
execution of the timer will "win" (i.e. if you output to another
PCollection from the timer, only one output will end up in that
PCollection). However if you are logging from the timer callback, you can
see repeated logs. The same hold true for any side-effect operation - e.g.
if you call an external RPC from the timer callback, those calls may be
repeatetd.

On Wed, Jul 15, 2020 at 8:23 PM Kenneth Knowles  wrote:

> Hello!
>
> What runner are you using? Does this reproduce on multiple runners? (it is
> very quick to try out your pipeline on DirectRunner and local versions of
> open source runners like Flink, Spark, etc)
>
> If you can produce a complete working reproduction it will be easier for
> someone to debug. I do not see anything wrong with your code. I assumed you
> got the `window` variable out of the ProcessContext\ (you can also make it
> a parameter to @ProcessElement)
>
> Kenn
>
> On Wed, Jul 15, 2020 at 4:38 PM Zhiheng Huang 
> wrote:
>
>> Hi,
>>
>> I am trying to set a timer at window expiration time for my use case and
>> expect it to fire just once per key per window.
>> But instead I observe that the onTimer() method gets called multiple
>> times almost all the time.
>>
>> Here's the relevant code snippet:
>>
>> @TimerId(WIN_EXP)
>> private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>> @StateId(COUNTS)
>> private final StateSpec>> counts =
>> StateSpecs.value();
>>
>> @ProcessElement
>> public void process(
>> ProcessContext context,
>> @StateId(COUNTS) ValueState>
>> countsState,
>> @TimerId(WIN_EXP) Timer winExpTimer) {
>>
>>   ...
>>   Map counts = countsState.read();
>>   if (counts == null) {
>> counts = new HashMap<>();
>> // Only place where I set the timer
>>
>> winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
>>   }
>>   ... // no return here and I do not observe exception in the pipeline
>>   countsState.write(counts);
>>   ...
>> }
>>
>> I tried adding logs in OnTimer:
>>
>> String key = keyState.read();
>> if (key != null && key.equals("xxx")) {
>>   logger.error(String.format("fired for %s.",
>> context.window().maxTimestamp().toDateTime()));
>> }
>>
>> Output:
>>
>> E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
>>
>> Seems like this is not due to some contention, the first log and the last
>> is ~1minute apart. BTW, my allowed
>> lateness is also set to 1 minute.
>>
>> Anyone can let me know if I am missing something here? I am using beam
>> 2.22 and dataflow runner.
>>
>> Thanks!
>>
>>


Re: DoFn Timer fire multiple times

2020-07-15 Thread Kenneth Knowles
Hello!

What runner are you using? Does this reproduce on multiple runners? (it is
very quick to try out your pipeline on DirectRunner and local versions of
open source runners like Flink, Spark, etc)

If you can produce a complete working reproduction it will be easier for
someone to debug. I do not see anything wrong with your code. I assumed you
got the `window` variable out of the ProcessContext\ (you can also make it
a parameter to @ProcessElement)

Kenn

On Wed, Jul 15, 2020 at 4:38 PM Zhiheng Huang  wrote:

> Hi,
>
> I am trying to set a timer at window expiration time for my use case and
> expect it to fire just once per key per window.
> But instead I observe that the onTimer() method gets called multiple times
> almost all the time.
>
> Here's the relevant code snippet:
>
> @TimerId(WIN_EXP)
> private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId(COUNTS)
> private final StateSpec>> counts =
> StateSpecs.value();
>
> @ProcessElement
> public void process(
> ProcessContext context,
> @StateId(COUNTS) ValueState>
> countsState,
> @TimerId(WIN_EXP) Timer winExpTimer) {
>
>   ...
>   Map counts = countsState.read();
>   if (counts == null) {
> counts = new HashMap<>();
> // Only place where I set the timer
>
> winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
>   }
>   ... // no return here and I do not observe exception in the pipeline
>   countsState.write(counts);
>   ...
> }
>
> I tried adding logs in OnTimer:
>
> String key = keyState.read();
> if (key != null && key.equals("xxx")) {
>   logger.error(String.format("fired for %s.",
> context.window().maxTimestamp().toDateTime()));
> }
>
> Output:
>
> E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
>
> Seems like this is not due to some contention, the first log and the last
> is ~1minute apart. BTW, my allowed
> lateness is also set to 1 minute.
>
> Anyone can let me know if I am missing something here? I am using beam
> 2.22 and dataflow runner.
>
> Thanks!
>
>


DoFn Timer fire multiple times

2020-07-15 Thread Zhiheng Huang
Hi,

I am trying to set a timer at window expiration time for my use case and
expect it to fire just once per key per window.
But instead I observe that the onTimer() method gets called multiple times
almost all the time.

Here's the relevant code snippet:

@TimerId(WIN_EXP)
private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@StateId(COUNTS)
private final StateSpec>> counts =
StateSpecs.value();

@ProcessElement
public void process(
ProcessContext context,
@StateId(COUNTS) ValueState>
countsState,
@TimerId(WIN_EXP) Timer winExpTimer) {

  ...
  Map counts = countsState.read();
  if (counts == null) {
counts = new HashMap<>();
// Only place where I set the timer

winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
  }
  ... // no return here and I do not observe exception in the pipeline
  countsState.write(counts);
  ...
}

I tried adding logs in OnTimer:

String key = keyState.read();
if (key != null && key.equals("xxx")) {
  logger.error(String.format("fired for %s.",
context.window().maxTimestamp().toDateTime()));
}

Output:

E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.

Seems like this is not due to some contention, the first log and the last
is ~1minute apart. BTW, my allowed
lateness is also set to 1 minute.

Anyone can let me know if I am missing something here? I am using beam 2.22
and dataflow runner.

Thanks!