OK, thank you. Much appreciated. Yes, I don’t want the job to fail. The source has very little data that is being pumped into a Broadcast stream.
From: Robert Metzger <rmetz...@apache.org> Date: Friday, May 8, 2020 at 9:51 AM To: Jingsong Li <jingsongl...@gmail.com> Cc: Senthil Kumar <senthi...@vmware.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Correctly implementing of SourceFunction.run() Hey Kumar, if you are swallowing any and all exceptions, your Flink job will not fail because of issues arising from your custom source. It might make sense to stop the source if you are catching an InterruptedException. Throwing exceptions out of the run method basically signals the Flink framework that the source has failed, and thus the job will fail / go into recovery. The way you are using the cancel() method + isRunning variable is correct for having a proper cancellation behavior of the source. On Fri, May 8, 2020 at 3:31 AM Jingsong Li <jingsongl...@gmail.com<mailto:jingsongl...@gmail.com>> wrote: Hi, Some suggestions from my side: - synchronized (checkpointLock) to some work and ctx.collect? - Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job). Best, Jingsong Lee On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <senthi...@vmware.com<mailto:senthi...@vmware.com>> wrote: I am implementing a source function which periodically wakes up and consumes data from S3. My currently implementation is like so. Following: org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction Is it safe to simply swallow any and all exceptions in the run method and just rely on this.isRunning variable to quit the run() method? Cheers Kumar --- @Override public void cancel() { this.isRunning = false; // Set volatile state variable, initially set to true on Class } @Override public void run(SourceFunction.SourceContext<OUT> ctx) { while (this.isRunning) { try { OUT out = /* Do some work */ ctx.collect(out); Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to milli seconds } catch(Throwable t) { // Simply swallow } } } -- Best, Jingsong Lee