Re: How to use onTimer() on event stream for *ProcessFunction?

2021-06-17 Thread Felipe Gutierrez
I didn't know that I don't need to implement CheckpointedFunction if I use
ListState. However, I considered this answer (
https://stackoverflow.com/a/47443537/2096986) where Fabian says:

"You can store parts of the operator state also in the ListState (instead
of holding it on the heap) but it will be quite expensive to access
individual elements because you have to traverse an iterator."

So, maybe implementing the CheckpointedFunction and save on snapshotState()
is still better? Maybe not? What do you think?

Would you please consider shedding a light on this question related to
CoProcessFunction and event time trigger after a job failure? (
https://lists.apache.org/x/thread.html/r5f74099a7b91b4ad47ac7612631f7e03d08c0e1d374487da55aa1a31@%3Cuser.flink.apache.org%3E
)

Thank you very much!
Felipe



On Thu, Jun 17, 2021 at 4:38 PM Arvid Heise  wrote:

> Let's start in reverse: you don't need to implement CheckpointedFunction
> if you use managed state (ListState is managed).
>
> Now to the question of how you should implement onTimer. That's up to you
> and heavily depends on your use case.
> The first onTimer implementation is called 60s after an element of key X
> has been processed. If you have additional elements with key X, you get
> additional timers. However, in this example, only the latest timer should
> actually output data (think of some session detection). That's why the
> implementation checks if it is indeed the last timer or not before
> outputting elements.
>
> The other implementation always outputs elements independent of additional
> timers/elements being added.
>
> On Wed, Jun 16, 2021 at 4:08 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi community,
>>
>> I don't understand why that KeyedProcessFunction.onTimer() is implemented
>> here [1] is different from here [2]. Both are KeyedProcessFunction and they
>> aim to fire a window on event time. At [1] the events are emitted at if
>> (timestamp == result.lastModified + 6) and the time is scheduled from
>> the ctx.timestamp().
>>
>> public void processElement(). {
>> current.lastModified = ctx.timestamp();
>> ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
>> }
>> public void onTimer() {
>> // get the state for the key that scheduled the timer
>> CountWithTimestamp result = state.value();
>> // check if this is an outdated timer or the latest timer
>> if (timestamp == result.lastModified + 6) {
>> // emit the state on timeout
>> out.collect(new Tuple2(result.key,
>> result.count));
>> }
>>
>> At [2] there is no comparison of time on the onTimer() method. Plus the
>> events are scheduled using a formula (eventTime - (eventTime %
>> durationMsec) + durationMsec - 1) and only if they are not late of the
>> watermark (eventTime <= timerService.currentWatermark()).
>>
>> public void processElement(). {
>> if (eventTime <= timerService.currentWatermark()) {
>> // This event is late; its window has already been triggered.
>> } else {
>> // Round up eventTime to the end of the window containing this
>> event.
>> long endOfWindow = (eventTime - (eventTime % durationMsec) +
>> durationMsec - 1);
>> }
>> public void onTimer() {
>> Float sumOfTips = this.sumOfTips.get(timestamp);
>>
>>
>> My use case uses a CoProcessFunction and I am saving the states
>> on ListState. It works fine with the approach [1]. When I used the approach
>> [2] some of the events are late because of the watermark.
>>
>> What is the correct to be used? Or what is the best?
>>
>> Afterwards I have to make this function fault tolerant. So, my next
>> question is. Do I have to implement CheckpointedFunction and
>> CheckpointedRestoring. Or because I am using ListState it already recovers
>> from failures?
>>
>> Thanks,
>> Felipe
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method
>>
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>>
>


Re: How to use onTimer() on event stream for *ProcessFunction?

2021-06-17 Thread Arvid Heise
Let's start in reverse: you don't need to implement CheckpointedFunction if
you use managed state (ListState is managed).

Now to the question of how you should implement onTimer. That's up to you
and heavily depends on your use case.
The first onTimer implementation is called 60s after an element of key X
has been processed. If you have additional elements with key X, you get
additional timers. However, in this example, only the latest timer should
actually output data (think of some session detection). That's why the
implementation checks if it is indeed the last timer or not before
outputting elements.

The other implementation always outputs elements independent of additional
timers/elements being added.

On Wed, Jun 16, 2021 at 4:08 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I don't understand why that KeyedProcessFunction.onTimer() is implemented
> here [1] is different from here [2]. Both are KeyedProcessFunction and they
> aim to fire a window on event time. At [1] the events are emitted at if
> (timestamp == result.lastModified + 6) and the time is scheduled from
> the ctx.timestamp().
>
> public void processElement(). {
> current.lastModified = ctx.timestamp();
> ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
> }
> public void onTimer() {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key,
> result.count));
> }
>
> At [2] there is no comparison of time on the onTimer() method. Plus the
> events are scheduled using a formula (eventTime - (eventTime %
> durationMsec) + durationMsec - 1) and only if they are not late of the
> watermark (eventTime <= timerService.currentWatermark()).
>
> public void processElement(). {
> if (eventTime <= timerService.currentWatermark()) {
> // This event is late; its window has already been triggered.
> } else {
> // Round up eventTime to the end of the window containing this
> event.
> long endOfWindow = (eventTime - (eventTime % durationMsec) +
> durationMsec - 1);
> }
> public void onTimer() {
> Float sumOfTips = this.sumOfTips.get(timestamp);
>
>
> My use case uses a CoProcessFunction and I am saving the states
> on ListState. It works fine with the approach [1]. When I used the approach
> [2] some of the events are late because of the watermark.
>
> What is the correct to be used? Or what is the best?
>
> Afterwards I have to make this function fault tolerant. So, my next
> question is. Do I have to implement CheckpointedFunction and
> CheckpointedRestoring. Or because I am using ListState it already recovers
> from failures?
>
> Thanks,
> Felipe
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>


How to use onTimer() on event stream for *ProcessFunction?

2021-06-16 Thread Felipe Gutierrez
Hi community,

I don't understand why that KeyedProcessFunction.onTimer() is implemented
here [1] is different from here [2]. Both are KeyedProcessFunction and they
aim to fire a window on event time. At [1] the events are emitted at if
(timestamp == result.lastModified + 6) and the time is scheduled from
the ctx.timestamp().

public void processElement(). {
current.lastModified = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
}
public void onTimer() {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 6) {
// emit the state on timeout
out.collect(new Tuple2(result.key, result.count));
}

At [2] there is no comparison of time on the onTimer() method. Plus the
events are scheduled using a formula (eventTime - (eventTime %
durationMsec) + durationMsec - 1) and only if they are not late of the
watermark (eventTime <= timerService.currentWatermark()).

public void processElement(). {
if (eventTime <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
} else {
// Round up eventTime to the end of the window containing this
event.
long endOfWindow = (eventTime - (eventTime % durationMsec) +
durationMsec - 1);
}
public void onTimer() {
Float sumOfTips = this.sumOfTips.get(timestamp);


My use case uses a CoProcessFunction and I am saving the states
on ListState. It works fine with the approach [1]. When I used the approach
[2] some of the events are late because of the watermark.

What is the correct to be used? Or what is the best?

Afterwards I have to make this function fault tolerant. So, my next
question is. Do I have to implement CheckpointedFunction and
CheckpointedRestoring. Or because I am using ListState it already recovers
from failures?

Thanks,
Felipe


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*