Hi, When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and that will be used as the read offset when restoring.
Best, Aljoscha > On 11. Oct 2017, at 12:58, Rahul Raj <rahulrajms...@gmail.com> wrote: > > Changing the group id didn't work for me, instead using > setStartfromEarliest() on kafka consumer worked for me. But it created one > confusion, that is in case of failure if I start from a particular checkpoint > or savepoint will the application start reading the message from a particular > offset where checkpoint/savepoint was created or it will start reading from > the first record in Kafka partition? > > Rahul Raj > > On 11 October 2017 at 15:44, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi, > > I think the problem is that your Kafka consumer has the same group-id across > those two runs. This means that it will pick up the last "read position" of > the previous run, and thus not read anything. If you change the group-id for > the second run you should be able to read your data again. > > Best, > Aljoscha > > >> On 11. Oct 2017, at 06:19, Rahul Raj <rahulrajms...@gmail.com >> <mailto:rahulrajms...@gmail.com>> wrote: >> >> Hi , >> >> I have written a program which reads data from Kafka, parses the json and >> does some reduce operation. The problem I am facing is, the program executes >> perfectly for the first time on a day. But when I kill the program and >> execute it again, an empty file is created. Even after compiling again and >> running, an empty file is created. >> >> var kafkaConsumer = new FlinkKafkaConsumer08( >> >> params.getRequired("input-topic"), >> >> new SimpleStringSchema, >> >> params.getProperties) >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> >> var messageStream = >> env.addSource(kafkaConsumer).filter(t=>t.contains(pattern)) >> >> >> >> var mts = messageStream.assignTimestampsAndWatermarks(new >> AssignerWithPeriodicWatermarks[String] { >> >> var ts = Long.MinValue >> >> >> >> override def extractTimestamp(element: String, >> previousElementTimestamp: Long): Long = { >> >> var timestamp = json_decode(element).toLong >> >> ts = Math.max(timestamp,previousElementTimestamp) >> >> timestamp >> >> } >> >> >> >> override def getCurrentWatermark(): Watermark = { >> >> new Watermark(ts) >> >> } >> >> }) >> >> var output = mts >> >> .keyBy(t=>json_decode(t)) >> >> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >> >> .allowedLateness(Time.seconds(5)) >> >> .reduce((v1,v2)=>v1+"----"+v2) >> >> >> >> output.writeAsText(path).setParallelism(1) >> >> >> >> I am using FileSystem as statebackend. I am assuming this problem is related >> to memory cleaning, but I don't understand what's happening. >> >> Any help? >> >> >> >> Rahul Raj >> >> >> > >