Upon further testing, I noticed that flink did not pass checkpoint state
to UnboundedSource.createReader() from UnboundedSourceWrapper.open().
In my case checkpoints are written to hdfs, however If I stop the job
or kill, the ckeckpoints are cleared from hdfs. So that may be the
reason no state was passed to UnboundedSource.createReader() on starting
the job again.
I am currently using flink 1.1.3 and beam 0.3. I will upgrade to beam
0.6.0 and flink 1.2 and perform the tests again.
Thanks,
Jins George
On 03/21/2017 09:39 AM, Dan Halperin wrote:
hey,
The native Beam UnboundedSource API supports resuming from checkpoint
-- that specifically happens here
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> when
the KafkaCheckpointMark is non-null.
The FlinkRunner should be providing the KafkaCheckpointMark from the
most recent savepoint upon restore.
There shouldn't be any "special" Flink runner support needed, nor is
the State API involved.
Dan
On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <[email protected]
<mailto:[email protected]>> wrote:
Would not it be Flink runner specific ?
Maybe the State API could do the same in a runner agnostic way
(just thinking loud) ?
Regards
JB
On 03/21/2017 04:56 PM, Mingmin Xu wrote:
From KafkaIO itself, looks like it either start_from_beginning or
start_from_latest. It's designed to leverage
`UnboundedSource.CheckpointMark`
during initialization, but so far I don't see it's provided by
runners. At the
moment Flink savepoints is a good option, created a JIRA(BEAM-1775
<https://issues.apache.org/jira/browse/BEAM-1775
<https://issues.apache.org/jira/browse/BEAM-1775>>) to handle
it in KafkaIO.
Mingmin
On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek
<[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>> wrote:
Hi,
Are you using Flink savepoints [1] when restoring your
application? If you
use this the Kafka offset should be stored in state and it
should restart
from the correct position.
Best,
Aljoscha
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html>
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html>>
> On 21 Mar 2017, at 01:50, Jins George
<[email protected] <mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>> wrote:
>
> Hello,
>
> I am writing a Beam pipeline(streaming) with Flink
runner to consume data
from Kafka and apply some transformations and persist to
Hbase.
>
> If I restart the application ( due to failure/manual
restart), consumer
does not resume from the offset where it was prior to
restart. It always
resume from the latest offset.
>
> If I enable Flink checkpionting with hdfs state
back-end, system appears
to be resuming from the earliest offset
>
> Is there a recommended way to resume from the offset
where it was stopped ?
>
> Thanks,
> Jins George
--
----
Mingmin
--
Jean-Baptiste Onofré
[email protected] <mailto:[email protected]>
http://blog.nanthrax.net
Talend - http://www.talend.com