Re: stream processing savepoints and watermarks question
Everybody, Thank you for the quick response. Yes, we inadvertently used the -d/--drain flag when stopping the job. We were not aware that it would cause a MAX_WATERMARK to roll through our system. MAX_WATERMARKS are catastrophic for the event time timers we have in our system. We know now never to use -d again for this situation. Again, thank you. -Marco On Thu, Sep 23, 2021 at 11:01 PM JING ZHANG wrote: > Hi Macro, > Do you specified drain flag when stop a job with a savepoint? > If the --drain flag is specified, then a MAX_WATERMARK will be emitted > before the last checkpoint barrier. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint > > Best, > JING ZHANG > > Marco Villalobos 于2021年9月24日周五 下午12:54写道: > >> Something strange happened today. >> When we tried to shutdown a job with a savepoint, the watermarks became >> equal to 2^63 - 1. >> >> This caused timers to fire indefinitely and crash downstream systems with >> overloaded untrue data. >> >> We are using event time processing with Kafka as our source. >> >> It seems impossible for a watermark to be that large. >> >> I know its possible stream with a batch execution mode. But this was >> stream processing. >> >> What can cause this? Is this normal behavior when creating a savepoint? >> >
RE: stream processing savepoints and watermarks question
Hi all, The way I understand the matter is that the described behavior is intentional for event time timers: * When called, an event time handler can register new timers · The timestamp parameter (override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[K, I, O]#OnTimerContext, out: Collector[O]): Unit = { …) * Is set to the timeout time of the timer, not the watermark that caused the timeout ! * onTimer, as said, can register new timers, timeout time can even be in the past, * in that case the timeout is handled immediately after current onTimer(…) call * as long as onTimer registers new timers it will iterate through all these timers When an operator receives a watermark it fires all registered timers with timeout <= watermark, in timeout order, also the ones registered in onTimer(). This is especially the case for a MAX_WATERMARK watermark, but will be the same for any watermark that lies in the future. For your case, @Marco, you could break this pattern by comparing the timeout to be registered with current processing time and if that lies safely too much in the future: not register the timeout. That would break the infinite iteration over timers … I believe the behavior exhibited by flink is intentional and no defect! What do you think? Thias From: JING ZHANG Sent: Freitag, 24. September 2021 12:25 To: Guowei Ma Cc: Marco Villalobos ; user Subject: Re: stream processing savepoints and watermarks question Hi Guowei, Thanks for quick response, maybe I didn't express it clearly in the last email. In fact, above case happened in reality, not what I imagined. When MAX_WATERMARK is received, the operator will try to fire all registered event-time timers. However in the above case, new timers are continuous being registered. I would try to reproduce the problem in an ITCase, and once completed I would provide the code. Best, JING ZHANG Guowei Ma mailto:guowei@gmail.com>> 于2021年9月24日周五 下午5:16写道: Hi, JING Thanks for the case. But I am not sure this would happen. As far as I know the event timer could only be triggered when there is a watermark (except the "quiesce phase"). I think it could not advance any watermarks after MAX_WATERMARK is received. Best, Guowei On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG mailto:beyond1...@gmail.com>> wrote: Hi Guowei, I could provide a case that I have encountered which timers to fire indefinitely when doing drain savepoint. After an event timer is triggered, it registers another event timer whose value equals the value of triggered timer plus an interval time. If a MAX_WATERMARK comes, the timer is triggered, then registers another timer and forever. I'm not sure whether Macro meets a similar problem. Best, JING ZHANG Guowei Ma mailto:guowei@gmail.com>> 于2021年9月24日周五 下午4:01写道: Hi Macro Indeed, as mentioned by JING, if you want to drain when triggering savepoint, you will encounter this MAX_WATERMARK. But I have a problem. In theory, even with MAX_WATERMARK, there will not be an infinite number of timers. And these timers should be generated by the application code. You can share your code if it is convenient for you. Best, Guowei On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG mailto:beyond1...@gmail.com>> wrote: Hi Macro, Do you specified drain flag when stop a job with a savepoint? If the --drain flag is specified, then a MAX_WATERMARK will be emitted before the last checkpoint barrier. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint Best, JING ZHANG Marco Villalobos mailto:mvillalo...@kineteque.com>> 于2021年9月24日周五 下午12:54写道: Something strange happened today. When we tried to shutdown a job with a savepoint, the watermarks became equal to 2^63 - 1. This caused timers to fire indefinitely and crash downstream systems with overloaded untrue data. We are using event time processing with Kafka as our source. It seems impossible for a watermark to be that large. I know its possible stream with a batch execution mode. But this was stream processing. What can cause this? Is this normal behavior when creating a savepoint? Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guar
Re: stream processing savepoints and watermarks question
Hi Guowei, Thanks for quick response, maybe I didn't express it clearly in the last email. In fact, above case happened in reality, not what I imagined. When MAX_WATERMARK is received, the operator will try to fire all registered event-time timers. However in the above case, new timers are continuous being registered. I would try to reproduce the problem in an ITCase, and once completed I would provide the code. Best, JING ZHANG Guowei Ma 于2021年9月24日周五 下午5:16写道: > Hi, JING > > Thanks for the case. > But I am not sure this would happen. As far as I know the event timer > could only be triggered when there is a watermark (except the "quiesce > phase"). > I think it could not advance any watermarks after MAX_WATERMARK is > received. > > Best, > Guowei > > > On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG wrote: > >> Hi Guowei, >> I could provide a case that I have encountered which timers to fire >> indefinitely when doing drain savepoint. >> After an event timer is triggered, it registers another event timer >> whose value equals the value of triggered timer plus an interval time. >> If a MAX_WATERMARK comes, the timer is triggered, then registers another >> timer and forever. >> I'm not sure whether Macro meets a similar problem. >> >> Best, >> JING ZHANG >> >> >> >> Guowei Ma 于2021年9月24日周五 下午4:01写道: >> >>> Hi Macro >>> >>> Indeed, as mentioned by JING, if you want to drain when triggering >>> savepoint, you will encounter this MAX_WATERMARK. >>> But I have a problem. In theory, even with MAX_WATERMARK, there will not >>> be an infinite number of timers. And these timers should be generated by >>> the application code. >>> You can share your code if it is convenient for you. >>> >>> Best, >>> Guowei >>> >>> >>> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG wrote: >>> Hi Macro, Do you specified drain flag when stop a job with a savepoint? If the --drain flag is specified, then a MAX_WATERMARK will be emitted before the last checkpoint barrier. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint Best, JING ZHANG Marco Villalobos 于2021年9月24日周五 下午12:54写道: > Something strange happened today. > When we tried to shutdown a job with a savepoint, the watermarks > became equal to 2^63 - 1. > > This caused timers to fire indefinitely and crash downstream systems > with overloaded untrue data. > > We are using event time processing with Kafka as our source. > > It seems impossible for a watermark to be that large. > > I know its possible stream with a batch execution mode. But this was > stream processing. > > What can cause this? Is this normal behavior when creating a > savepoint? >
Re: stream processing savepoints and watermarks question
Hi, JING Thanks for the case. But I am not sure this would happen. As far as I know the event timer could only be triggered when there is a watermark (except the "quiesce phase"). I think it could not advance any watermarks after MAX_WATERMARK is received. Best, Guowei On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG wrote: > Hi Guowei, > I could provide a case that I have encountered which timers to fire > indefinitely when doing drain savepoint. > After an event timer is triggered, it registers another event timer > whose value equals the value of triggered timer plus an interval time. > If a MAX_WATERMARK comes, the timer is triggered, then registers another > timer and forever. > I'm not sure whether Macro meets a similar problem. > > Best, > JING ZHANG > > > > Guowei Ma 于2021年9月24日周五 下午4:01写道: > >> Hi Macro >> >> Indeed, as mentioned by JING, if you want to drain when triggering >> savepoint, you will encounter this MAX_WATERMARK. >> But I have a problem. In theory, even with MAX_WATERMARK, there will not >> be an infinite number of timers. And these timers should be generated by >> the application code. >> You can share your code if it is convenient for you. >> >> Best, >> Guowei >> >> >> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG wrote: >> >>> Hi Macro, >>> Do you specified drain flag when stop a job with a savepoint? >>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted >>> before the last checkpoint barrier. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint >>> >>> Best, >>> JING ZHANG >>> >>> Marco Villalobos 于2021年9月24日周五 下午12:54写道: >>> Something strange happened today. When we tried to shutdown a job with a savepoint, the watermarks became equal to 2^63 - 1. This caused timers to fire indefinitely and crash downstream systems with overloaded untrue data. We are using event time processing with Kafka as our source. It seems impossible for a watermark to be that large. I know its possible stream with a batch execution mode. But this was stream processing. What can cause this? Is this normal behavior when creating a savepoint? >>>
Re: stream processing savepoints and watermarks question
Hi Guowei, I could provide a case that I have encountered which timers to fire indefinitely when doing drain savepoint. After an event timer is triggered, it registers another event timer whose value equals the value of triggered timer plus an interval time. If a MAX_WATERMARK comes, the timer is triggered, then registers another timer and forever. I'm not sure whether Macro meets a similar problem. Best, JING ZHANG Guowei Ma 于2021年9月24日周五 下午4:01写道: > Hi Macro > > Indeed, as mentioned by JING, if you want to drain when triggering > savepoint, you will encounter this MAX_WATERMARK. > But I have a problem. In theory, even with MAX_WATERMARK, there will not > be an infinite number of timers. And these timers should be generated by > the application code. > You can share your code if it is convenient for you. > > Best, > Guowei > > > On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG wrote: > >> Hi Macro, >> Do you specified drain flag when stop a job with a savepoint? >> If the --drain flag is specified, then a MAX_WATERMARK will be emitted >> before the last checkpoint barrier. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint >> >> Best, >> JING ZHANG >> >> Marco Villalobos 于2021年9月24日周五 下午12:54写道: >> >>> Something strange happened today. >>> When we tried to shutdown a job with a savepoint, the watermarks became >>> equal to 2^63 - 1. >>> >>> This caused timers to fire indefinitely and crash downstream systems >>> with overloaded untrue data. >>> >>> We are using event time processing with Kafka as our source. >>> >>> It seems impossible for a watermark to be that large. >>> >>> I know its possible stream with a batch execution mode. But this was >>> stream processing. >>> >>> What can cause this? Is this normal behavior when creating a savepoint? >>> >>
Re: stream processing savepoints and watermarks question
Hi Macro Indeed, as mentioned by JING, if you want to drain when triggering savepoint, you will encounter this MAX_WATERMARK. But I have a problem. In theory, even with MAX_WATERMARK, there will not be an infinite number of timers. And these timers should be generated by the application code. You can share your code if it is convenient for you. Best, Guowei On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG wrote: > Hi Macro, > Do you specified drain flag when stop a job with a savepoint? > If the --drain flag is specified, then a MAX_WATERMARK will be emitted > before the last checkpoint barrier. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint > > Best, > JING ZHANG > > Marco Villalobos 于2021年9月24日周五 下午12:54写道: > >> Something strange happened today. >> When we tried to shutdown a job with a savepoint, the watermarks became >> equal to 2^63 - 1. >> >> This caused timers to fire indefinitely and crash downstream systems with >> overloaded untrue data. >> >> We are using event time processing with Kafka as our source. >> >> It seems impossible for a watermark to be that large. >> >> I know its possible stream with a batch execution mode. But this was >> stream processing. >> >> What can cause this? Is this normal behavior when creating a savepoint? >> >
Re: stream processing savepoints and watermarks question
Hi Macro, Do you specified drain flag when stop a job with a savepoint? If the --drain flag is specified, then a MAX_WATERMARK will be emitted before the last checkpoint barrier. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint Best, JING ZHANG Marco Villalobos 于2021年9月24日周五 下午12:54写道: > Something strange happened today. > When we tried to shutdown a job with a savepoint, the watermarks became > equal to 2^63 - 1. > > This caused timers to fire indefinitely and crash downstream systems with > overloaded untrue data. > > We are using event time processing with Kafka as our source. > > It seems impossible for a watermark to be that large. > > I know its possible stream with a batch execution mode. But this was > stream processing. > > What can cause this? Is this normal behavior when creating a savepoint? >