OK, thank you. Much appreciated.

Yes, I don’t want the job to fail. The source has very little data that is 
being pumped into a Broadcast stream.

From: Robert Metzger <rmetz...@apache.org>
Date: Friday, May 8, 2020 at 9:51 AM
To: Jingsong Li <jingsongl...@gmail.com>
Cc: Senthil Kumar <senthi...@vmware.com>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: Correctly implementing of SourceFunction.run()

Hey Kumar,

if you are swallowing any and all exceptions, your Flink job will not fail 
because of issues arising from your custom source. It might make sense to stop 
the source if you are catching an InterruptedException.

Throwing exceptions out of the run method basically signals the Flink framework 
that the source has failed, and thus the job will fail / go into recovery.
The way you are using the cancel() method + isRunning variable is correct for 
having a proper cancellation behavior of the source.



On Fri, May 8, 2020 at 3:31 AM Jingsong Li 
<jingsongl...@gmail.com<mailto:jingsongl...@gmail.com>> wrote:
Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not swallow 
interrupt exception (Like cancel the job).

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar 
<senthi...@vmware.com<mailto:senthi...@vmware.com>> wrote:
I am implementing a source function which periodically wakes up and consumes 
data from S3.


My currently implementation is like so.

Following: 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

Is it safe to simply swallow any and all exceptions in the run method and just 
rely on this.isRunning variable to quit the run() method?

Cheers
Kumar

---


@Override
public void cancel() {
    this.isRunning = false;   // Set volatile state variable, initially set to 
true on Class
}

@Override
public void run(SourceFunction.SourceContext<OUT> ctx) {
    while (this.isRunning) {
        try {
            OUT out = /* Do some work */
            ctx.collect(out);
            Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to 
milli seconds
        } catch(Throwable t) {
            // Simply swallow
        }
    }
}



--
Best, Jingsong Lee

Reply via email to