Re: Recover watermark from savepoint

2022-07-05 Thread Sweta Kalakuntla
Hi Thias,

Thank you for providing a detailed explanation. We did something similar.

The job is set to 0 late events, aggregates every 20 min and sends out the
value. So we are saving the last processed window per key in the state.
During aggregation, if current window > last window, then processes else
discard. This will be applied even during savepoint recovery and discards
any aggregations due to late events.

Thank you,
Sweta




On Fri, Jun 10, 2022 at 2:38 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Sweta,
>
>
>
> It is actually a sound idea to implement a dedicated process function for
> this purpose, as David suggests.
>
> Especially if you are in a situation where waiting for a valid natural
> watermark after a restore from savepoint is not sufficient.
>
>
>
> We had a situation with input streams of different update frequencies (one
> only updated once a day, and hence only generated watermarks once a day).
>
>
>
> This is how you can approach the specific task of
>
>- watermark storing:
>   - Create a process function
>   - Create a map that stores the latest watermark per sub-partition
>   (i.e. there are 128 sub-partitions in a job with max-parallelism of 128)
>   - Store this map into operator state with each checkpoint
>   - Create a repeating processing time timer (with high frequency
>   according to your needs), in order to yield a watermark after savepoint
>   restore
>- watermark restoring:
>   - when restoring from operator state (because there might have been
>   a change in parallelism):
>   - determine the lowest watermark among all sub-partition that
>   belong to the respective subtask (on operator state restore)
>   - yield this watermark in processing time handler of above timer
>   (once)
>
>
>
> Feel free to ask details, I hope this helps … I need to ask my folks
> whether I can share our implementation (20 lines of code, odd).
>
>
>
> What do you think?
>
>
>
> Thias
>
>
>
>
>
> *From:* David Anderson 
> *Sent:* Thursday, June 9, 2022 11:35 AM
> *To:* User-Flink 
> *Subject:* Re: Recover watermark from savepoint
>
>
>
> Sweta,
>
>
>
> Flink does not include watermarks in savepoints, nor are they included in
> aligned checkpoints. For what it's worth, I believe that with unaligned
> checkpoints in-flight watermarks are included in checkpoints, but I don't
> believe that would solve the problem, since the watermark strategy's state
> is still lost during a restart.
>
>
>
> I can't think of any way to guarantee that all possibly late events will
> be deterministically identified as late. The commonly used
> bounded-out-of-orderness watermark strategy doesn't guarantee this either,
> even without a restart (because watermarks are delayed by the auto
> watermark interval, rather than being produced at every conceivable
> opportunity).
>
>
>
> If this is a strong requirement, you could decide not to rely on
> watermarks for dropping late events, and implement the logic yourself in a
> process function.
>
>
>
> Best,
>
> David
>
>
>
> On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla 
> wrote:
>
> Hi,
>
>
>
> I want to understand if flink saves a watermark during savepoint and if
> not, how do we achieve this?
>
>
>
> We are seeing an issue where on recovery, the job processes some late
> events which should have been discarded if the job were to be running
> without any downtime.
>
>
>
> Thank you,
>
> Sweta
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


RE: Recover watermark from savepoint

2022-06-09 Thread Schwalbe Matthias
Hi Sweta,

It is actually a sound idea to implement a dedicated process function for this 
purpose, as David suggests.
Especially if you are in a situation where waiting for a valid natural 
watermark after a restore from savepoint is not sufficient.

We had a situation with input streams of different update frequencies (one only 
updated once a day, and hence only generated watermarks once a day).

This is how you can approach the specific task of

  *   watermark storing:
 *   Create a process function
 *   Create a map that stores the latest watermark per sub-partition (i.e. 
there are 128 sub-partitions in a job with max-parallelism of 128)
 *   Store this map into operator state with each checkpoint
 *   Create a repeating processing time timer (with high frequency 
according to your needs), in order to yield a watermark after savepoint restore
  *   watermark restoring:
 *   when restoring from operator state (because there might have been a 
change in parallelism):
 *   determine the lowest watermark among all sub-partition that belong to 
the respective subtask (on operator state restore)
 *   yield this watermark in processing time handler of above timer (once)

Feel free to ask details, I hope this helps … I need to ask my folks whether I 
can share our implementation (20 lines of code, odd).

What do you think?

Thias


From: David Anderson 
Sent: Thursday, June 9, 2022 11:35 AM
To: User-Flink 
Subject: Re: Recover watermark from savepoint

Sweta,

Flink does not include watermarks in savepoints, nor are they included in 
aligned checkpoints. For what it's worth, I believe that with unaligned 
checkpoints in-flight watermarks are included in checkpoints, but I don't 
believe that would solve the problem, since the watermark strategy's state is 
still lost during a restart.

I can't think of any way to guarantee that all possibly late events will be 
deterministically identified as late. The commonly used 
bounded-out-of-orderness watermark strategy doesn't guarantee this either, even 
without a restart (because watermarks are delayed by the auto watermark 
interval, rather than being produced at every conceivable opportunity).

If this is a strong requirement, you could decide not to rely on watermarks for 
dropping late events, and implement the logic yourself in a process function.

Best,
David

On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla 
mailto:skalakun...@bandwidth.com>> wrote:
Hi,

I want to understand if flink saves a watermark during savepoint and if not, 
how do we achieve this?

We are seeing an issue where on recovery, the job processes some late events 
which should have been discarded if the job were to be running without any 
downtime.

Thank you,
Sweta
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Recover watermark from savepoint

2022-06-09 Thread David Anderson
Sweta,

Flink does not include watermarks in savepoints, nor are they included in
aligned checkpoints. For what it's worth, I believe that with unaligned
checkpoints in-flight watermarks are included in checkpoints, but I don't
believe that would solve the problem, since the watermark strategy's state
is still lost during a restart.

I can't think of any way to guarantee that all possibly late events will be
deterministically identified as late. The commonly used
bounded-out-of-orderness watermark strategy doesn't guarantee this either,
even without a restart (because watermarks are delayed by the auto
watermark interval, rather than being produced at every conceivable
opportunity).

If this is a strong requirement, you could decide not to rely on watermarks
for dropping late events, and implement the logic yourself in a process
function.

Best,
David

On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla 
wrote:

> Hi,
>
> I want to understand if flink saves a watermark during savepoint and if
> not, how do we achieve this?
>
> We are seeing an issue where on recovery, the job processes some late
> events which should have been discarded if the job were to be running
> without any downtime.
>
> Thank you,
> Sweta
>