For this you would use externalised checkpoints: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html>

Unfortunately, the doc is still a bit sparse but it’s basically a combination 
of savepoints and checkpoints. Checkpoints are not cleaned up when a job fails 
and you can restore from them as you would from a savepoint.

Best,
Aljoscha

> On 22 Mar 2017, at 21:25, Jins George <[email protected]> wrote:
> 
> 
> Thanks Aljoscha for the clarification. Savepoints works fine in case of 
> controlled stop and restart. In case of a failure( say the entire job failed 
> due node crash or application software bug) is there a way to resume from the 
> checkpoint on restarting the application ? Checkpoint location is configured 
> with HDFS.
> 
> Thanks,
> Jins George
> 
> On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:
>> As I mentioned before, when running a Flink Job and simply cancelling it all 
>> state about that job is discarded (with some exceptions, such as 
>> externalised checkpoints). If you want the state of a Job to survive a 
>> cancellation you have to perform a savepoint [1] and then when restarting 
>> the Job you have to specify a savepoint from which you want to restore.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>> 
>>> On 22 Mar 2017, at 01:43, Raghu Angadi <[email protected]> wrote:
>>> 
>>> Expanding a bit more on what Dan wrote:
>>> 
>>>   - In Dataflow, there are two modes of restarting a job : regular stop
>>>   and then start & an *update*. The checkpoint is carried over only in the
>>>   case of update.
>>>   - Update is the only to keep 'exactly-once' semantics.
>>>   - If the requirements are not very strict, you can enable offset commits
>>>   in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
>>>   start reading from approximately where it left off in the previous run.
>>>      - When a offset commits are enabled, KafkaIO could this by
>>>      implementing 'finalize()' API on KafkaCheckpointMark [1].
>>>      - This is runner independent.
>>>      - The compromise is that this might skip a few records or read a few
>>>      old records when the pipeline is restarted.
>>>      - This does not override 'resume from checkpoint' support when runner
>>>      provides KafkaCheckpointMark. Externally committed offsets are used 
>>> only
>>>      when KafkaIO's own CheckpointMark is not available.
>>> 
>>> [1]:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50
>>> 
>>> On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin <[email protected]> wrote:
>>> 
>>>> [We should keep user list involved if that's where the discussion
>>>> originally was :)]
>>>> 
>>>> Jins George's original question was a good one. The right way to resume
>>>> from the previous offset here is what we're already doing – use the
>>>> KafkaCheckpointMark. In Beam, the runner maintains the state and not the
>>>> external system. Beam runners are responsible for maintaining the
>>>> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
>>>> a user disables checkpointing, then they are explicitly opting into "redo
>>>> all work" on restart.
>>>> 
>>>> --> If checkpointing is enabled but the KafkaCheckpointMark is not being
>>>> provided, then I'm inclined to agree with Amit that there may simply be a
>>>> bug in the FlinkRunner. (+aljoscha)
>>>> 
>>>> For what Mingmin Xu asked about: presumably if the Kafka source is
>>>> initially configured to "read from latest offset", when it restarts with no
>>>> checkpoint this will automatically go find the latest offset. That would
>>>> mimic at-most-once semantics in a buggy runner that did not provide
>>>> checkpointing.
>>>> 
>>>> Dan
>>>> 
>>>> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu <[email protected]> wrote:
>>>> 
>>>>> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
>>>>> Can it restore during job restart? --Not test the runner in streaming for
>>>>> some time.
>>>>> 
>>>>> Regarding to data-completeness, I would use at-most-once when few data
>>>>> missing(mostly tasknode failure) is tolerated, compared to the performance
>>>>> cost introduced by 'state'/'checkpoint'.
>>>>> 
>>>>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <[email protected]> wrote:
>>>>> 
>>>>>> On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <[email protected]> wrote:
>>>>>> 
>>>>>>> Move discuss to dev-list
>>>>>>> 
>>>>>>> Savepoint in Flink, also checkpoint in Spark, should be good enough to
>>>>>>> handle this case.
>>>>>>> 
>>>>>>> When people don't enable these features, for example only need
>>>>>> at-most-once
>>>>>> The Spark runner forces checkpointing on any streaming (Beam)
>>>>> application,
>>>>>> mostly because it uses mapWithState for reading from UnboundedSource and
>>>>>> updateStateByKey form GroupByKey - so by design, Spark runner is
>>>>>> at-least-once. Generally, I always thought that applications that
>>>>> require
>>>>>> at-most-once are more focused on processing time only, as they only care
>>>>>> about whatever get's ingested into the pipeline at a specific time and
>>>>>> don't care (up to the point of losing data) about correctness.
>>>>>> I would be happy to hear more about your use case.
>>>>>> 
>>>>>>> semantic, each unbounded IO should try its best to restore from last
>>>>>>> offset, although CheckpointMark is null. Any ideas?
>>>>>>> 
>>>>>>> Mingmin
>>>>>>> 
>>>>>>> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <[email protected]>
>>>>>> wrote:
>>>>>>>> hey,
>>>>>>>> 
>>>>>>>> The native Beam UnboundedSource API supports resuming from
>>>>> checkpoint
>>>>>> --
>>>>>>>> that specifically happens here
>>>>>>>> <
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk
>>>>>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
>>>>>>> when
>>>>>>>> the KafkaCheckpointMark is non-null.
>>>>>>>> 
>>>>>>>> The FlinkRunner should be providing the KafkaCheckpointMark from the
>>>>>> most
>>>>>>>> recent savepoint upon restore.
>>>>>>>> 
>>>>>>>> There shouldn't be any "special" Flink runner support needed, nor is
>>>>>> the
>>>>>>>> State API involved.
>>>>>>>> 
>>>>>>>> Dan
>>>>>>>> 
>>>>>>>> On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
>>>>> [email protected]
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Would not it be Flink runner specific ?
>>>>>>>>> 
>>>>>>>>> Maybe the State API could do the same in a runner agnostic way
>>>>> (just
>>>>>>>>> thinking loud) ?
>>>>>>>>> 
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>> 
>>>>>>>>> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
>>>>>>>>> 
>>>>>>>>>> From KafkaIO itself, looks like it either start_from_beginning or
>>>>>>>>>> start_from_latest. It's designed to leverage
>>>>>>>>>> `UnboundedSource.CheckpointMark`
>>>>>>>>>> during initialization, but so far I don't see it's provided by
>>>>>> runners.
>>>>>>>>>> At the
>>>>>>>>>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
>>>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it
>>>>> in
>>>>>>>>>> KafkaIO.
>>>>>>>>>> 
>>>>>>>>>> Mingmin
>>>>>>>>>> 
>>>>>>>>>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
>>>>>> [email protected]
>>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>> 
>>>>>>>>>>    Hi,
>>>>>>>>>>    Are you using Flink savepoints [1] when restoring your
>>>>>> application?
>>>>>>>>>> If you
>>>>>>>>>>    use this the Kafka offset should be stored in state and it
>>>>> should
>>>>>>>>>> restart
>>>>>>>>>>    from the correct position.
>>>>>>>>>> 
>>>>>>>>>>    Best,
>>>>>>>>>>    Aljoscha
>>>>>>>>>> 
>>>>>>>>>>    [1]
>>>>>>>>>>    https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>>>>> setup/savepoints.html
>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>>>>>>>>> /setup/savepoints.html>
>>>>>>>>>>> On 21 Mar 2017, at 01:50, Jins George <
>>>>> [email protected]
>>>>>>>>>>    <mailto:[email protected]>> wrote:
>>>>>>>>>>> Hello,
>>>>>>>>>>> 
>>>>>>>>>>> I am writing a Beam pipeline(streaming) with Flink runner to
>>>>>>>>>> consume data
>>>>>>>>>>    from Kafka and apply some transformations and persist to
>>>>> Hbase.
>>>>>>>>>>> If I restart the application ( due to failure/manual
>>>>> restart),
>>>>>>>>>> consumer
>>>>>>>>>>    does not resume from the offset where it was prior to
>>>>> restart. It
>>>>>>>>>> always
>>>>>>>>>>    resume from the latest offset.
>>>>>>>>>>> If I enable Flink checkpionting with hdfs state back-end,
>>>>>> system
>>>>>>>>>> appears
>>>>>>>>>>    to be resuming from the earliest offset
>>>>>>>>>>> Is there a recommended way to resume from the offset where
>>>>> it
>>>>>> was
>>>>>>>>>> stopped ?
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Jins George
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> ----
>>>>>>>>>> Mingmin
>>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>> [email protected]
>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> ----
>>>>>>> Mingmin
>>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> ----
>>>>> Mingmin
>>>>> 
>>>> 
> 

Reply via email to