Hi,

When you are restoring from a savepoint (or checkpoint) the offsets in Kafka 
are complete ignored. Flink is checkpointing the offset at the time the 
checkpoint/savepoint is taken and that will be used as the read offset when 
restoring.

Best,
Aljoscha

> On 11. Oct 2017, at 12:58, Rahul Raj <rahulrajms...@gmail.com> wrote:
> 
> Changing the group id didn't work for me, instead using 
> setStartfromEarliest() on kafka consumer worked for me. But it created one 
> confusion, that is in case of failure if I start from a particular checkpoint 
> or savepoint will the application start reading the message from a particular 
> offset where checkpoint/savepoint was created or it will start reading from 
> the first record in Kafka partition?
> 
> Rahul Raj 
> 
> On 11 October 2017 at 15:44, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> I think the problem is that your Kafka consumer has the same group-id across 
> those two runs. This means that it will pick up the last "read position" of 
> the previous run, and thus not read anything. If you change the group-id for 
> the second run you should be able to read your data again.
> 
> Best,
> Aljoscha
> 
> 
>> On 11. Oct 2017, at 06:19, Rahul Raj <rahulrajms...@gmail.com 
>> <mailto:rahulrajms...@gmail.com>> wrote:
>> 
>> Hi ,
>> 
>> I have written a program which reads data from Kafka, parses the json and 
>> does some reduce operation. The problem I am facing is, the program executes 
>> perfectly for the first time on a day. But when I kill the program and 
>> execute it again, an empty file is created. Even after compiling again and 
>> running, an empty file is created.
>> 
>> var kafkaConsumer = new FlinkKafkaConsumer08(
>> 
>>       params.getRequired("input-topic"),
>> 
>>       new SimpleStringSchema,
>> 
>>       params.getProperties)
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 
>> 
>>     var messageStream = 
>> env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
>> 
>> 
>> 
>>     var mts = messageStream.assignTimestampsAndWatermarks(new 
>> AssignerWithPeriodicWatermarks[String] {
>> 
>>       var ts = Long.MinValue
>> 
>> 
>> 
>>       override def extractTimestamp(element: String, 
>> previousElementTimestamp: Long): Long = {
>> 
>>         var timestamp = json_decode(element).toLong
>> 
>>         ts = Math.max(timestamp,previousElementTimestamp)
>> 
>>         timestamp
>> 
>>       }
>> 
>> 
>> 
>>       override def getCurrentWatermark(): Watermark = {
>> 
>>         new Watermark(ts)
>> 
>>       }
>> 
>>     })
>> 
>>     var output = mts
>> 
>>       .keyBy(t=>json_decode(t))
>> 
>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>> 
>>       .allowedLateness(Time.seconds(5))
>> 
>>       .reduce((v1,v2)=>v1+"----"+v2)
>> 
>> 
>> 
>> output.writeAsText(path).setParallelism(1)
>> 
>> 
>> 
>> I am using FileSystem as statebackend. I am assuming this problem is related 
>> to memory cleaning, but I don't understand what's happening.
>> 
>> Any help?
>> 
>> 
>> 
>> Rahul Raj
>> 
>> 
>> 
> 
> 

Reply via email to