Re: stream processing savepoints and watermarks question

2021-09-24 Thread Marco Villalobos
Everybody,

Thank you for the quick response.

Yes, we inadvertently used the -d/--drain flag when stopping the job. We
were not aware that it would cause a MAX_WATERMARK to roll through our
system.

MAX_WATERMARKS are catastrophic for the event time timers we have in our
system.

We know now never to use -d again for this situation.

Again, thank you.

-Marco

On Thu, Sep 23, 2021 at 11:01 PM JING ZHANG  wrote:

> Hi Macro,
> Do you specified drain flag when stop a job with a savepoint?
> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
> before the last checkpoint barrier.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> Best,
> JING ZHANG
>
> Marco Villalobos  于2021年9月24日周五 下午12:54写道:
>
>> Something strange happened today.
>> When we tried to shutdown a job with a savepoint, the watermarks became
>> equal to 2^63 - 1.
>>
>> This caused timers to fire indefinitely and crash downstream systems with
>> overloaded untrue data.
>>
>> We are using event time processing with Kafka as our source.
>>
>> It seems impossible for a watermark to be that large.
>>
>> I know its possible stream with a batch execution mode.  But this was
>> stream processing.
>>
>> What can cause this?  Is this normal behavior when creating a savepoint?
>>
>


RE: stream processing savepoints and watermarks question

2021-09-24 Thread Schwalbe Matthias
Hi all,

The way I understand the matter is that the described behavior is intentional 
for event time timers:


  *   When called, an event time handler can register new timers

· The timestamp parameter (override def onTimer(timestamp: Long, ctx: 
KeyedProcessFunction[K, I, O]#OnTimerContext, out: Collector[O]): Unit = { …)

 *   Is set to the timeout time of the timer, not the watermark that caused 
the timeout !
  *   onTimer, as said, can register new timers, timeout time can even be in 
the past,
  *   in that case the timeout is handled immediately after current onTimer(…) 
call
  *   as long as onTimer registers new timers it will iterate through all these 
timers

When an operator receives a watermark it fires all registered timers with 
timeout <= watermark, in timeout order, also the ones registered in onTimer().

This is especially the case for a  MAX_WATERMARK watermark, but will be the 
same for any watermark that lies in the future.

For your case, @Marco, you could break this pattern by comparing the timeout to 
be registered with current processing time and if that lies safely too much in 
the future: not register the timeout.
That would break the infinite iteration over timers …

I believe the behavior exhibited by flink is intentional and no defect!

What do you think?

Thias


From: JING ZHANG 
Sent: Freitag, 24. September 2021 12:25
To: Guowei Ma 
Cc: Marco Villalobos ; user 
Subject: Re: stream processing savepoints and watermarks question

Hi Guowei,
Thanks for quick response, maybe I didn't express it clearly in the last email.
In fact, above case happened in reality, not what I imagined.
When MAX_WATERMARK is received, the operator will try to fire all registered 
event-time timers. However in the above case, new timers are continuous being 
registered.
I would try to reproduce the problem in an ITCase, and once completed I would 
provide the code.

Best,
JING ZHANG

Guowei Ma mailto:guowei@gmail.com>> 于2021年9月24日周五 
下午5:16写道:
Hi, JING

Thanks for the case.
But I am not sure this would happen. As far as I know the event timer could 
only be triggered when there is a watermark (except the "quiesce phase").
I think it could not advance any watermarks after MAX_WATERMARK is received.

Best,
Guowei


On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG 
mailto:beyond1...@gmail.com>> wrote:
Hi Guowei,
I could provide a case that I have encountered which timers to fire 
indefinitely when doing drain savepoint.
After an event timer is triggered, it registers another event timer whose value 
equals the value of triggered timer plus an interval time.
If a MAX_WATERMARK comes, the timer is triggered, then registers another timer 
and forever.
I'm not sure whether Macro meets a similar problem.

Best,
JING ZHANG



Guowei Ma mailto:guowei@gmail.com>> 于2021年9月24日周五 
下午4:01写道:
Hi Macro

Indeed, as mentioned by JING, if you want to drain when triggering savepoint, 
you will encounter this MAX_WATERMARK.
But I have a problem. In theory, even with MAX_WATERMARK, there will not be an 
infinite number of timers. And these timers should be generated by the 
application code.
You can share your code if it is convenient for you.

Best,
Guowei


On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG 
mailto:beyond1...@gmail.com>> wrote:
Hi Macro,
Do you specified drain flag when stop a job with a savepoint?
If the --drain flag is specified, then a MAX_WATERMARK will be emitted before 
the last checkpoint barrier.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

Best,
JING ZHANG

Marco Villalobos mailto:mvillalo...@kineteque.com>> 
于2021年9月24日周五 下午12:54写道:
Something strange happened today.
When we tried to shutdown a job with a savepoint, the watermarks became equal 
to 2^63 - 1.

This caused timers to fire indefinitely and crash downstream systems with 
overloaded untrue data.

We are using event time processing with Kafka as our source.

It seems impossible for a watermark to be that large.

I know its possible stream with a batch execution mode.  But this was stream 
processing.

What can cause this?  Is this normal behavior when creating a savepoint?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guar

Re: stream processing savepoints and watermarks question

2021-09-24 Thread JING ZHANG
Hi Guowei,
Thanks for quick response, maybe I didn't express it clearly in the last
email.
In fact, above case happened in reality, not what I imagined.
When MAX_WATERMARK is received, the operator will try to fire all
registered event-time timers. However in the above case, new timers are
continuous being registered.
I would try to reproduce the problem in an ITCase, and once completed I
would provide the code.

Best,
JING ZHANG

Guowei Ma  于2021年9月24日周五 下午5:16写道:

> Hi, JING
>
> Thanks for the case.
> But I am not sure this would happen. As far as I know the event timer
> could only be triggered when there is a watermark (except the "quiesce
> phase").
> I think it could not advance any watermarks after MAX_WATERMARK is
> received.
>
> Best,
> Guowei
>
>
> On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG  wrote:
>
>> Hi Guowei,
>> I could provide a case that I have encountered which timers to fire
>> indefinitely when doing drain savepoint.
>> After an event timer is triggered, it registers another event timer
>> whose value equals the value of triggered timer plus an interval time.
>> If a MAX_WATERMARK comes, the timer is triggered, then registers another
>> timer and forever.
>> I'm not sure whether Macro meets a similar problem.
>>
>> Best,
>> JING ZHANG
>>
>>
>>
>> Guowei Ma  于2021年9月24日周五 下午4:01写道:
>>
>>> Hi Macro
>>>
>>> Indeed, as mentioned by JING, if you want to drain when triggering
>>> savepoint, you will encounter this MAX_WATERMARK.
>>> But I have a problem. In theory, even with MAX_WATERMARK, there will not
>>> be an infinite number of timers. And these timers should be generated by
>>> the application code.
>>> You can share your code if it is convenient for you.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG  wrote:
>>>
 Hi Macro,
 Do you specified drain flag when stop a job with a savepoint?
 If the --drain flag is specified, then a MAX_WATERMARK will be emitted
 before the last checkpoint barrier.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

 Best,
 JING ZHANG

 Marco Villalobos  于2021年9月24日周五 下午12:54写道:

> Something strange happened today.
> When we tried to shutdown a job with a savepoint, the watermarks
> became equal to 2^63 - 1.
>
> This caused timers to fire indefinitely and crash downstream systems
> with overloaded untrue data.
>
> We are using event time processing with Kafka as our source.
>
> It seems impossible for a watermark to be that large.
>
> I know its possible stream with a batch execution mode.  But this was
> stream processing.
>
> What can cause this?  Is this normal behavior when creating a
> savepoint?
>



Re: stream processing savepoints and watermarks question

2021-09-24 Thread Guowei Ma
Hi, JING

Thanks for the case.
But I am not sure this would happen. As far as I know the event timer could
only be triggered when there is a watermark (except the "quiesce phase").
I think it could not advance any watermarks after MAX_WATERMARK is received.

Best,
Guowei


On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG  wrote:

> Hi Guowei,
> I could provide a case that I have encountered which timers to fire
> indefinitely when doing drain savepoint.
> After an event timer is triggered, it registers another event timer
> whose value equals the value of triggered timer plus an interval time.
> If a MAX_WATERMARK comes, the timer is triggered, then registers another
> timer and forever.
> I'm not sure whether Macro meets a similar problem.
>
> Best,
> JING ZHANG
>
>
>
> Guowei Ma  于2021年9月24日周五 下午4:01写道:
>
>> Hi Macro
>>
>> Indeed, as mentioned by JING, if you want to drain when triggering
>> savepoint, you will encounter this MAX_WATERMARK.
>> But I have a problem. In theory, even with MAX_WATERMARK, there will not
>> be an infinite number of timers. And these timers should be generated by
>> the application code.
>> You can share your code if it is convenient for you.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG  wrote:
>>
>>> Hi Macro,
>>> Do you specified drain flag when stop a job with a savepoint?
>>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
>>> before the last checkpoint barrier.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Marco Villalobos  于2021年9月24日周五 下午12:54写道:
>>>
 Something strange happened today.
 When we tried to shutdown a job with a savepoint, the watermarks became
 equal to 2^63 - 1.

 This caused timers to fire indefinitely and crash downstream systems
 with overloaded untrue data.

 We are using event time processing with Kafka as our source.

 It seems impossible for a watermark to be that large.

 I know its possible stream with a batch execution mode.  But this was
 stream processing.

 What can cause this?  Is this normal behavior when creating a savepoint?

>>>


Re: stream processing savepoints and watermarks question

2021-09-24 Thread JING ZHANG
Hi Guowei,
I could provide a case that I have encountered which timers to fire
indefinitely when doing drain savepoint.
After an event timer is triggered, it registers another event timer
whose value equals the value of triggered timer plus an interval time.
If a MAX_WATERMARK comes, the timer is triggered, then registers another
timer and forever.
I'm not sure whether Macro meets a similar problem.

Best,
JING ZHANG



Guowei Ma  于2021年9月24日周五 下午4:01写道:

> Hi Macro
>
> Indeed, as mentioned by JING, if you want to drain when triggering
> savepoint, you will encounter this MAX_WATERMARK.
> But I have a problem. In theory, even with MAX_WATERMARK, there will not
> be an infinite number of timers. And these timers should be generated by
> the application code.
> You can share your code if it is convenient for you.
>
> Best,
> Guowei
>
>
> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG  wrote:
>
>> Hi Macro,
>> Do you specified drain flag when stop a job with a savepoint?
>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
>> before the last checkpoint barrier.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>>
>> Best,
>> JING ZHANG
>>
>> Marco Villalobos  于2021年9月24日周五 下午12:54写道:
>>
>>> Something strange happened today.
>>> When we tried to shutdown a job with a savepoint, the watermarks became
>>> equal to 2^63 - 1.
>>>
>>> This caused timers to fire indefinitely and crash downstream systems
>>> with overloaded untrue data.
>>>
>>> We are using event time processing with Kafka as our source.
>>>
>>> It seems impossible for a watermark to be that large.
>>>
>>> I know its possible stream with a batch execution mode.  But this was
>>> stream processing.
>>>
>>> What can cause this?  Is this normal behavior when creating a savepoint?
>>>
>>


Re: stream processing savepoints and watermarks question

2021-09-24 Thread Guowei Ma
Hi Macro

Indeed, as mentioned by JING, if you want to drain when triggering
savepoint, you will encounter this MAX_WATERMARK.
But I have a problem. In theory, even with MAX_WATERMARK, there will not be
an infinite number of timers. And these timers should be generated by the
application code.
You can share your code if it is convenient for you.

Best,
Guowei


On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG  wrote:

> Hi Macro,
> Do you specified drain flag when stop a job with a savepoint?
> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
> before the last checkpoint barrier.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> Best,
> JING ZHANG
>
> Marco Villalobos  于2021年9月24日周五 下午12:54写道:
>
>> Something strange happened today.
>> When we tried to shutdown a job with a savepoint, the watermarks became
>> equal to 2^63 - 1.
>>
>> This caused timers to fire indefinitely and crash downstream systems with
>> overloaded untrue data.
>>
>> We are using event time processing with Kafka as our source.
>>
>> It seems impossible for a watermark to be that large.
>>
>> I know its possible stream with a batch execution mode.  But this was
>> stream processing.
>>
>> What can cause this?  Is this normal behavior when creating a savepoint?
>>
>


Re: stream processing savepoints and watermarks question

2021-09-24 Thread JING ZHANG
Hi Macro,
Do you specified drain flag when stop a job with a savepoint?
If the --drain flag is specified, then a MAX_WATERMARK will be emitted
before the last checkpoint barrier.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

Best,
JING ZHANG

Marco Villalobos  于2021年9月24日周五 下午12:54写道:

> Something strange happened today.
> When we tried to shutdown a job with a savepoint, the watermarks became
> equal to 2^63 - 1.
>
> This caused timers to fire indefinitely and crash downstream systems with
> overloaded untrue data.
>
> We are using event time processing with Kafka as our source.
>
> It seems impossible for a watermark to be that large.
>
> I know its possible stream with a batch execution mode.  But this was
> stream processing.
>
> What can cause this?  Is this normal behavior when creating a savepoint?
>