Hi Flavio,

In general, deleting the redundant timers is definitely more
memory-friendly.
The reason why in the docs the code is presented the way it is, is:
1) it is mainly for pedagogical purposes, and
2) when the docs were written, Flink mechanism for deleting timers was
    not efficient as it has to iterate over the whole list of registered
timers.

The code you presented seems ok, apart from the `deleteProcessingTimeTimer`
which should be `deleteEventTimeTimer` in the case of the example in the
docs.

Cheers,
Kostas

On Thu, Mar 7, 2019 at 10:44 AM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi to all,
> I was writing a process function similar to the one described in the Flink
> docs at [1].
> Basically I need to set a timeout before emitting elements.
> However, the proposed approach creates a timer for every incoming
> tuple..isn't it dangerous if a key receives a very big burst of events?
> You'll end up having tons of useless registered timers that will fill
> somehow the memory.
> Isn't it better to remove obsolete timers? E.g:
>
> CountWithTimestamp current = state.value();
> if (current == null) {
>      current = new CountWithTimestamp();
>      current.key = value.f0;
>  } else {
>     ctx.timerService().deleteProcessingTimeTimer(current.lastModified +
> timeout);
>  }
>
> Best,
> Flavio
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>

Reply via email to