For this you would use externalised checkpoints: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html>
Unfortunately, the doc is still a bit sparse but it’s basically a combination of savepoints and checkpoints. Checkpoints are not cleaned up when a job fails and you can restore from them as you would from a savepoint. Best, Aljoscha > On 22 Mar 2017, at 21:25, Jins George <[email protected]> wrote: > > > Thanks Aljoscha for the clarification. Savepoints works fine in case of > controlled stop and restart. In case of a failure( say the entire job failed > due node crash or application software bug) is there a way to resume from the > checkpoint on restarting the application ? Checkpoint location is configured > with HDFS. > > Thanks, > Jins George > > On 03/22/2017 03:23 AM, Aljoscha Krettek wrote: >> As I mentioned before, when running a Flink Job and simply cancelling it all >> state about that job is discarded (with some exceptions, such as >> externalised checkpoints). If you want the state of a Job to survive a >> cancellation you have to perform a savepoint [1] and then when restarting >> the Job you have to specify a savepoint from which you want to restore. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html >> >>> On 22 Mar 2017, at 01:43, Raghu Angadi <[email protected]> wrote: >>> >>> Expanding a bit more on what Dan wrote: >>> >>> - In Dataflow, there are two modes of restarting a job : regular stop >>> and then start & an *update*. The checkpoint is carried over only in the >>> case of update. >>> - Update is the only to keep 'exactly-once' semantics. >>> - If the requirements are not very strict, you can enable offset commits >>> in Kafka itself. KafkaIO lets you configure this. Here the pipeline would >>> start reading from approximately where it left off in the previous run. >>> - When a offset commits are enabled, KafkaIO could this by >>> implementing 'finalize()' API on KafkaCheckpointMark [1]. >>> - This is runner independent. >>> - The compromise is that this might skip a few records or read a few >>> old records when the pipeline is restarted. >>> - This does not override 'resume from checkpoint' support when runner >>> provides KafkaCheckpointMark. Externally committed offsets are used >>> only >>> when KafkaIO's own CheckpointMark is not available. >>> >>> [1]: >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50 >>> >>> On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <[email protected]> wrote: >>> >>>> [We should keep user list involved if that's where the discussion >>>> originally was :)] >>>> >>>> Jins George's original question was a good one. The right way to resume >>>> from the previous offset here is what we're already doing – use the >>>> KafkaCheckpointMark. In Beam, the runner maintains the state and not the >>>> external system. Beam runners are responsible for maintaining the >>>> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If >>>> a user disables checkpointing, then they are explicitly opting into "redo >>>> all work" on restart. >>>> >>>> --> If checkpointing is enabled but the KafkaCheckpointMark is not being >>>> provided, then I'm inclined to agree with Amit that there may simply be a >>>> bug in the FlinkRunner. (+aljoscha) >>>> >>>> For what Mingmin Xu asked about: presumably if the Kafka source is >>>> initially configured to "read from latest offset", when it restarts with no >>>> checkpoint this will automatically go find the latest offset. That would >>>> mimic at-most-once semantics in a buggy runner that did not provide >>>> checkpointing. >>>> >>>> Dan >>>> >>>> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <[email protected]> wrote: >>>> >>>>> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory. >>>>> Can it restore during job restart? --Not test the runner in streaming for >>>>> some time. >>>>> >>>>> Regarding to data-completeness, I would use at-most-once when few data >>>>> missing(mostly tasknode failure) is tolerated, compared to the performance >>>>> cost introduced by 'state'/'checkpoint'. >>>>> >>>>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <[email protected]> wrote: >>>>> >>>>>> On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <[email protected]> wrote: >>>>>> >>>>>>> Move discuss to dev-list >>>>>>> >>>>>>> Savepoint in Flink, also checkpoint in Spark, should be good enough to >>>>>>> handle this case. >>>>>>> >>>>>>> When people don't enable these features, for example only need >>>>>> at-most-once >>>>>> The Spark runner forces checkpointing on any streaming (Beam) >>>>> application, >>>>>> mostly because it uses mapWithState for reading from UnboundedSource and >>>>>> updateStateByKey form GroupByKey - so by design, Spark runner is >>>>>> at-least-once. Generally, I always thought that applications that >>>>> require >>>>>> at-most-once are more focused on processing time only, as they only care >>>>>> about whatever get's ingested into the pipeline at a specific time and >>>>>> don't care (up to the point of losing data) about correctness. >>>>>> I would be happy to hear more about your use case. >>>>>> >>>>>>> semantic, each unbounded IO should try its best to restore from last >>>>>>> offset, although CheckpointMark is null. Any ideas? >>>>>>> >>>>>>> Mingmin >>>>>>> >>>>>>> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <[email protected]> >>>>>> wrote: >>>>>>>> hey, >>>>>>>> >>>>>>>> The native Beam UnboundedSource API supports resuming from >>>>> checkpoint >>>>>> -- >>>>>>>> that specifically happens here >>>>>>>> < >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk >>>>>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> >>>>>>> when >>>>>>>> the KafkaCheckpointMark is non-null. >>>>>>>> >>>>>>>> The FlinkRunner should be providing the KafkaCheckpointMark from the >>>>>> most >>>>>>>> recent savepoint upon restore. >>>>>>>> >>>>>>>> There shouldn't be any "special" Flink runner support needed, nor is >>>>>> the >>>>>>>> State API involved. >>>>>>>> >>>>>>>> Dan >>>>>>>> >>>>>>>> On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré < >>>>> [email protected] >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Would not it be Flink runner specific ? >>>>>>>>> >>>>>>>>> Maybe the State API could do the same in a runner agnostic way >>>>> (just >>>>>>>>> thinking loud) ? >>>>>>>>> >>>>>>>>> Regards >>>>>>>>> JB >>>>>>>>> >>>>>>>>> On 03/21/2017 04:56 PM, Mingmin Xu wrote: >>>>>>>>> >>>>>>>>>> From KafkaIO itself, looks like it either start_from_beginning or >>>>>>>>>> start_from_latest. It's designed to leverage >>>>>>>>>> `UnboundedSource.CheckpointMark` >>>>>>>>>> during initialization, but so far I don't see it's provided by >>>>>> runners. >>>>>>>>>> At the >>>>>>>>>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775 >>>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-1775>) to handle it >>>>> in >>>>>>>>>> KafkaIO. >>>>>>>>>> >>>>>>>>>> Mingmin >>>>>>>>>> >>>>>>>>>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek < >>>>>> [email protected] >>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> Are you using Flink savepoints [1] when restoring your >>>>>> application? >>>>>>>>>> If you >>>>>>>>>> use this the Kafka offset should be stored in state and it >>>>> should >>>>>>>>>> restart >>>>>>>>>> from the correct position. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>>>>>> setup/savepoints.html >>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3 >>>>>>>>>> /setup/savepoints.html> >>>>>>>>>>> On 21 Mar 2017, at 01:50, Jins George < >>>>> [email protected] >>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> I am writing a Beam pipeline(streaming) with Flink runner to >>>>>>>>>> consume data >>>>>>>>>> from Kafka and apply some transformations and persist to >>>>> Hbase. >>>>>>>>>>> If I restart the application ( due to failure/manual >>>>> restart), >>>>>>>>>> consumer >>>>>>>>>> does not resume from the offset where it was prior to >>>>> restart. It >>>>>>>>>> always >>>>>>>>>> resume from the latest offset. >>>>>>>>>>> If I enable Flink checkpionting with hdfs state back-end, >>>>>> system >>>>>>>>>> appears >>>>>>>>>> to be resuming from the earliest offset >>>>>>>>>>> Is there a recommended way to resume from the offset where >>>>> it >>>>>> was >>>>>>>>>> stopped ? >>>>>>>>>>> Thanks, >>>>>>>>>>> Jins George >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> ---- >>>>>>>>>> Mingmin >>>>>>>>>> >>>>>>>>> -- >>>>>>>>> Jean-Baptiste Onofré >>>>>>>>> [email protected] >>>>>>>>> http://blog.nanthrax.net >>>>>>>>> Talend - http://www.talend.com >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> ---- >>>>>>> Mingmin >>>>>>> >>>>> >>>>> >>>>> -- >>>>> ---- >>>>> Mingmin >>>>> >>>> >
