Hi James,

I literally just went through what you're doing at my job. While I'm using
Apache Beam and not the Flink api directly, the concepts still apply.
TL;DR: it works as expected.

What I did is I set up a kafka topic listener that always throws an
exception if the last received message's timestamp is less than 5 minutes
from when the processing happens (basically simulating a code fix after 5
minutes). Then I let the pipeline execute the normal processing and I'd
send a message on the exception topic.

I have set up flink to retry twice, Beam offers a flag
(numberOfExecutionRetries) [1] but it boils down to one of the Flink flags
here [2]. What that does is that once Flink encounters an exception, say
for example like my exception throwing topic, it will restore itself from
the last checkpoint which includes kafka offsets and other things that
transforms might have in there. Effectively this replays the messages after
the checkpoint, and of course, my exception is thrown again when it tries
to reprocess that message. After the second try, Flink will give up and the
Flink job will stop (just like if you cancel it). If ran in an IDE, process
will stop, if ran on a Flink cluster, the job will stop.

When a Flink job stops, it usually clears up its checkpoints, unless you
externalize them, for Beam it's the externalizedCheckpointsEnabled flag set
to true. Check the docs to see what that maps to.

Then, when you restart the flink job, just add the -s Flink flag followed
by the latest checkpoint path. If you're running from an IDE, say IntelliJ,
you can still pass the -s flag to Main method launcher.

We use a bash script to restart or Flink jobs in our UAT/PROD boxes for
now, you can use this command: find "$PATH_WHERE_YOU_SAVE_STATE" -name
"_metadata" -print0 | xargs -r -0 ls -1 -t | head -1 to find the latest
checkpoint in that path. And you know where PATH_WHERE_YOU_SAVE_STATE is,
because you have to specify it when you initially start the flink job. For
Beam, that's the stateBackendStoragePath flag. This is going to pick up the
latest checkpoint before the pipeline stopped and will continue from it
with your updated jar that handles the exception properly.

Also note that I think you can set all these flags with Java code. In Beam
it's just adding to the Main method args parameter or adding them to the
PipelineOptions once you build that object from args. I've never used the
Flink libs, just the runner, but from [1] and [3] it looks like you can
configure things in code if you prefer that.

Hope it helps,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/#configuration


On Wed, Feb 16, 2022 at 12:28 PM Sandys-Lumsdaine, James <
james.sandys-lumsda...@systematica.com> wrote:

> Thanks for your reply, Piotr.
>
>
>
> Some follow on questions:
>
> >". Nevertheless you might consider enabling them as this allows you to
> manually cancel the job if it enters an endless recovery/failure loop, fix
> the underlying issue, and restart the job from the externalised checkpoint.
>
>
>
> How is this done? Are you saying the retained checkpoint (i.e. the last
> checkpoint that isn’t deleted) can somehow be used when restarting the
> Flink application? If I am running in my IDE and just using the local
> streaming environment, how can I test my recovery code either with a
> retained checkpoint? All my attempts so far just say “No checkpoint found
> during restore.” Do I copy the checkpoint into a savepoint directory and
> treat it like a savepoint?
>
>
>
> On the topic of savepoints, that web page [1] says I need to use
> “bin/flink savepoint” or “bin/flink stop --savepointPath” – but again, if
> I’m currently not running in a real cluster how else can I create and
> recover from the save points?
>
>
>
> From what I’ve read there is state, checkpoints and save points – all of
> them hold state - and currently I can’t get any of these to restore when
> developing in an IDE and the program builds up all state from scratch. So
> what else do I need to do in my Java code to tell Flink to load a savepoint?
>
>
>
> Thanks,
>
>
>
> James.
>
>
>
>
>
> *From:* Piotr Nowojski <pnowoj...@apache.org>
> *Sent:* 16 February 2022 16:36
> *To:* James Sandys-Lumsdaine <jas...@hotmail.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Basic questions about resuming stateful Flink jobs
>
>
>
> *CAUTION: External email. The email originated outside of our company *
>
> Hi James,
>
>
>
> Sure! The basic idea of checkpoints is that they are fully owned by the
> running job and used for failure recovery. Thus by default if you stopped
> the job, checkpoints are being removed. If you want to stop a job and then
> later resume working from the same point that it has previously stopped,
> you most likely want to use savepoints [1]. You can stop the job with a
> savepoint and later you can restart another job from that savepoint.
>
>
>
> Regarding the externalised checkpoints. Technically you could use them in
> the similar way, but there is no command like "take a checkpoint and stop
> the job". Nevertheless you might consider enabling them as this allows you
> to manually cancel the job if it enters an endless recovery/failure
> loop, fix the underlying issue, and restart the job from the externalised
> checkpoint.
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/savepoints/
>
>
>
> śr., 16 lut 2022 o 16:44 James Sandys-Lumsdaine <jas...@hotmail.com>
> napisał(a):
>
> Hi all,
>
>
>
> I have a 1.14 Flink streaming workflow with many stateful functions that
> has a FsStateBackend and checkpointed enabled, although I haven't set a
> location for the checkpointed state.
>
>
>
> I've really struggled to understand how I can stop my Flink job and
> restart it and ensure it carries off exactly where is left off by using the
> state or checkpoints or savepoints. This is not clearly explained in the
> book or the web documentation.
>
>
>
> Since I have no control over my Flink job id I assume I can not force
> Flink to pick up the state recorded under the jobId directory for the
> FsStateBackend. Therefore I *think*​ Flink should read back in the last
> checkpointed data but I don't understand how to force my program to read
> this in? Do I use retained checkpoints or not? How can I force my program
> either use the last checkpointed state (e.g. when running from my IDE,
> starting and stopping the program) or maybe force it *not *to read in the
> state and start completely fresh?
>
>
>
> The web documentation talks about bin/flink but I am running from my IDE
> so I want my Java code to control this progress using the Flink API in Java.
>
>
>
> Can anyone give me some basic pointers as I'm obviously missing something
> fundamental on how to allow my program to be stopped and started without
> losing all the state.
>
>
>
> Many thanks,
>
>
>
> James.
>
>
>
> The information transmitted is intended only for the person or entity to
> which it is addressed and may contain confidential and/or privileged
> material. Any review, retransmission, dissemination or other use of, or
> taking of any action in reliance upon, this information by persons or
> entities other than the intended recipient is prohibited. If you received
> this in error, please contact the sender and delete the material from any
> computer.
>
> This communication is for informational purposes only. It is not intended
> as an offer or solicitation for the purchase or sale of any financial
> instrument or as an official confirmation of any transaction. Any market
> prices, data and other information are not warranted as to completeness or
> accuracy and are subject to change without notice. Any comments or
> statements made herein do not necessarily reflect those of Systematica
> Investments UK LLP, its parents, subsidiaries or affiliates.
>
> Systematica Investments UK LLP (“SIUK”), which is authorised and regulated
> by the Financial Conduct Authority of the United Kingdom (the “FCA”) is
> authorised and regulated by the Financial Conduct Authority and is
> registered with the U.S. Securities and Exchange Commission as an
> investment adviser under the Investment Advisers Act of 1940.
>
> Systematica Investments UK LLP is registered in England and Wales with a
> partnership number OC424197. Registered Office: Equitable House, 47 King
> William Street, London EC4R 9AF.
>
> Recipients of this communication should note that electronic
> communication, whether by email, website, SWIFT or otherwise, is an unsafe
> method of communication. Emails and SWIFT messages may be lost, delivered
> to the wrong address, intercepted or affected by delays, interference by
> third parties or viruses and their confidentiality, security and integrity
> cannot be guaranteed. None of SIGPL or any of its affiliates bear any
> liability or responsibility therefor.
>
> Please see the important information at www.systematica.com/disclaimer.
> <http://www.systematica.com/disclaimer>
>
> Please see the important information, including regarding the processing
> of personal data by Systematica, at www.systematica.com/PrivacyNotice.
>
> www.systematica.com
>

Reply via email to