Hi Senthil,

>From a quick look at the code, it seems that the cancel() of your
source function should be called, and the thread that it is running on
should be interrupted.

In order to pin down the problem and help us see if this is an actual
bug, could you please:
1) enable debug logging and see if you can spot some lines like this:

"Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth
similar with synchronous savepoint in it

and any other message afterwards with XXXX-ID in it to see if the
savepoint is completed successfully.

2) could you see if this behavior persists in the FLINK-1.10?

Thanks,
Kostas

On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <senthi...@vmware.com> wrote:
>
> Robert,
>
>
>
> Thank you once again! We are currently doing the “short” Thread.sleep() 
> approach. Seems to be working fine.
>
>
>
> Cheers
>
> Kumar
>
>
>
> From: Robert Metzger <rmetz...@apache.org>
> Date: Tuesday, June 2, 2020 at 2:40 AM
> To: Senthil Kumar <senthi...@vmware.com>
> Cc: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
> this is more a Java question than a Flink question now :) If it is easily 
> possible from your code, then I would regularly check the isRunning flag (by 
> having short Thread.sleeps()) to have a proper cancellation behavior.
>
> If this makes your code very complicated, then you could work with manually 
> interrupting your worker thread. I would only use this method if you are sure 
> your code (and the libraries you are using) are properly handling interrupts.
>
> Sorry that I can not give you a more actionable response. It depends a lot on 
> the structure of your code and the libraries you are calling into.
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <senthi...@vmware.com> wrote:
>
> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the 
> Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel 
> command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to set 
> the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are not 
> in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
> interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread (Thread.currentThread()) 
> before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> From: Robert Metzger <rmetz...@apache.org>
> Date: Friday, May 29, 2020 at 4:38 AM
> To: Senthil Kumar <senthi...@vmware.com>
> Cc: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way: 
> Having a "running" flag checked by the run() method and changing it in 
> cancel().
>
> Also, it is good that you are properly handling the interrupt set by Flink 
> (some people ignore InterruptedExceptions, which make it difficult (basically 
> impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <senthi...@vmware.com> wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via 
> the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps 
> (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
> the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the 
> isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not get 
> interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which 
> seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink job 
> receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got removed 
> in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar

Reply via email to