hey,

The native Beam UnboundedSource API supports resuming from checkpoint --
that specifically happens here
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
when
the KafkaCheckpointMark is non-null.

The FlinkRunner should be providing the KafkaCheckpointMark from the most
recent savepoint upon restore.

There shouldn't be any "special" Flink runner support needed, nor is the
State API involved.

Dan

On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Would not it be Flink runner specific ?
>
> Maybe the State API could do the same in a runner agnostic way (just
> thinking loud) ?
>
> Regards
> JB
>
> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
>
>> From KafkaIO itself, looks like it either start_from_beginning or
>> start_from_latest. It's designed to leverage
>> `UnboundedSource.CheckpointMark`
>> during initialization, but so far I don't see it's provided by runners.
>> At the
>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
>> <https://issues.apache.org/jira/browse/BEAM-1775>)  to handle it in
>> KafkaIO.
>>
>> Mingmin
>>
>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>>     Hi,
>>     Are you using Flink savepoints [1] when restoring your application?
>> If you
>>     use this the Kafka offset should be stored in state and it should
>> restart
>>     from the correct position.
>>
>>     Best,
>>     Aljoscha
>>
>>     [1]
>>     https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> setup/savepoints.html
>>     <https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 3/setup/savepoints.html>
>>     > On 21 Mar 2017, at 01:50, Jins George <[email protected]
>>     <mailto:[email protected]>> wrote:
>>     >
>>     > Hello,
>>     >
>>     > I am writing a Beam pipeline(streaming) with Flink runner to
>> consume data
>>     from Kafka and apply some transformations and persist to Hbase.
>>     >
>>     > If I restart the application ( due to failure/manual restart),
>> consumer
>>     does not resume from the offset where it was prior to restart. It
>> always
>>     resume from the latest offset.
>>     >
>>     > If I enable Flink checkpionting with hdfs state back-end, system
>> appears
>>     to be resuming from the earliest offset
>>     >
>>     > Is there a recommended way to resume from the offset where it was
>> stopped ?
>>     >
>>     > Thanks,
>>     > Jins George
>>
>>
>>
>>
>> --
>> ----
>> Mingmin
>>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to