Upon further testing, I noticed that flink did not pass checkpoint state to UnboundedSource.createReader() from UnboundedSourceWrapper.open(). In my case checkpoints are written to hdfs, however If I stop the job or kill, the ckeckpoints are cleared from hdfs. So that may be the reason no state was passed to UnboundedSource.createReader() on starting the job again.

I am currently using flink 1.1.3 and beam 0.3. I will upgrade to beam 0.6.0 and flink 1.2 and perform the tests again.

Thanks,
Jins George

On 03/21/2017 09:39 AM, Dan Halperin wrote:
hey,

The native Beam UnboundedSource API supports resuming from checkpoint -- that specifically happens here <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> when the KafkaCheckpointMark is non-null.

The FlinkRunner should be providing the KafkaCheckpointMark from the most recent savepoint upon restore.

There shouldn't be any "special" Flink runner support needed, nor is the State API involved.

Dan

On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <[email protected] <mailto:[email protected]>> wrote:

    Would not it be Flink runner specific ?

    Maybe the State API could do the same in a runner agnostic way
    (just thinking loud) ?

    Regards
    JB

    On 03/21/2017 04:56 PM, Mingmin Xu wrote:

        From KafkaIO itself, looks like it either start_from_beginning or
        start_from_latest. It's designed to leverage
        `UnboundedSource.CheckpointMark`
        during initialization, but so far I don't see it's provided by
        runners. At the
        moment Flink savepoints is a good option, created a JIRA(BEAM-1775
        <https://issues.apache.org/jira/browse/BEAM-1775
        <https://issues.apache.org/jira/browse/BEAM-1775>>) to handle
        it in KafkaIO.

        Mingmin

        On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek
        <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> wrote:

            Hi,
            Are you using Flink savepoints [1] when restoring your
        application? If you
            use this the Kafka offset should be stored in state and it
        should restart
            from the correct position.

            Best,
            Aljoscha

            [1]
        
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
        
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html>
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
        
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html>>
            > On 21 Mar 2017, at 01:50, Jins George
        <[email protected] <mailto:[email protected]>
            <mailto:[email protected]
        <mailto:[email protected]>>> wrote:
            >
            > Hello,
            >
            > I am writing a Beam pipeline(streaming) with Flink
        runner to consume data
            from Kafka and apply some transformations and persist to
        Hbase.
            >
            > If I restart the application ( due to failure/manual
        restart), consumer
            does not resume from the offset where it was prior to
        restart. It always
            resume from the latest offset.
            >
            > If I enable Flink checkpionting with hdfs state
        back-end, system appears
            to be resuming from the earliest offset
            >
            > Is there a recommended way to resume from the offset
        where it was stopped ?
            >
            > Thanks,
            > Jins George




        --
        ----
        Mingmin


-- Jean-Baptiste Onofré
    [email protected] <mailto:[email protected]>
    http://blog.nanthrax.net
    Talend - http://www.talend.com



Reply via email to