Re: Are savepoints / checkpoints co-ordinated?

2018-10-22 Thread Kostas Kloudas
Hi Anand,

Did the suggestion solve your issue?

Essentially when you cancel with savepoint, as Congxian suggested, you stop 
emitting checkpoints, 
but data keep flowing from the source to the sink. So if you do not set  the 
producer to exactly
once, you will almost certainly end up with duplicates.

Cheers,
Kostas

> On Oct 13, 2018, at 2:23 PM, vino yang  wrote:
> 
> Hi Anand,
> 
> About "Cancel with savepoint" congxian is right.
> 
> And for the duplicates, You should use kafka producer transaction (since 
> 0.11) provided EXACTLY_ONCE semantic[1].
> 
> Thanks, vino.
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html#kafka-011
>  
> 
> 
> 
> Congxian Qiu mailto:qcx978132...@gmail.com>> 
> 于2018年10月12日周五 下午7:55写道:
> AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->  trigger 
> a savepoint, then cancel your job. there will no more checkpoints.
> 
> mailto:anand.gopin...@ubs.com>> 于2018年10月12日周五 
> 上午1:30写道:
> Hi,
> 
>  
> 
> I had a couple questions about savepoints / checkpoints
> 
>  
> 
> When I issue "Cancel Job with Savepoint", how is that instruction 
> co-ordinated with check points? Am I certain the savepoint will be the last 
> operation (i.e. no more check points)? 
> 
>  
> 
> I have a kafka src>operation>kafka sink task in flink. And it looks like on 
> restart from the savepoint there are duplicates written to the sink topic in 
> kafka. The dupes overlap with the last few events prior to save point, and I 
> am trying to work out what could have happened.
> 
> My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but 
> env.enableCheckpointing(parameters.getInt("checkpoint.interval"), 
> CheckpointingMode.EXACTLY_ONCE).
> 
> I thought at least once still implies flushes to kafka still only occur with 
> a checkpoint.
> 
>  
> 
> One  theory is a further checkpoint occurred after/ during the savepoint - 
> which would have flushed events to kafka that are not in my savepoint.
> 
>  
> 
> Any pointers to schoolboy errors I may have made would be appreciated.
> 
>  
> 
> -
> 
> Also  am I right in thinking if I have managed state with rocksdb back end 
> that is using 1G on disk, but substantially less keyed state in memory, a 
> savepoint needs to save the full 1G to complete?
> 
>  
> 
> 
> Regards
> 
> Anand
> 
> 
> 
> -- 
> Blog:http://www.klion26.com 
> GTalk:qcx978132955
> 一切随心



Re: Are savepoints / checkpoints co-ordinated?

2018-10-13 Thread vino yang
Hi Anand,

About "Cancel with savepoint" congxian is right.

And for the duplicates, You should use kafka producer transaction (since
0.11) provided EXACTLY_ONCE semantic[1].

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html#kafka-011


Congxian Qiu  于2018年10月12日周五 下午7:55写道:

> AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->
> trigger a savepoint, then cancel your job. there will no more checkpoints.
>
>  于2018年10月12日周五 上午1:30写道:
>
>> Hi,
>>
>>
>>
>> I had a couple questions about savepoints / checkpoints
>>
>>
>>
>> When I issue "Cancel Job with Savepoint", how is that instruction
>> co-ordinated with check points? Am I certain the savepoint will be the last
>> operation (i.e. no more check points)?
>>
>>
>>
>> I have a kafka src>operation>kafka sink task in flink. And it looks like
>> on restart from the savepoint there are duplicates written to the sink
>> topic in kafka. The dupes overlap with the last few events prior to save
>> point, and I am trying to work out what could have happened.
>>
>> My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but
>> env.enableCheckpointing(parameters.getInt("checkpoint.interval"),
>> CheckpointingMode.EXACTLY_ONCE).
>>
>> I thought at least once still implies flushes to kafka still only occur
>> with a checkpoint.
>>
>>
>>
>> One  theory is a further checkpoint occurred after/ during the savepoint
>> - which would have flushed events to kafka that are not in my savepoint.
>>
>>
>>
>> Any pointers to schoolboy errors I may have made would be appreciated.
>>
>>
>>
>> -
>>
>> Also  am I right in thinking if I have managed state with rocksdb back
>> end that is using 1G on disk, but substantially less keyed state in memory,
>> a savepoint needs to save the full 1G to complete?
>>
>>
>>
>> Regards
>>
>> Anand
>>
>
>
> --
> Blog:http://www.klion26.com
> GTalk:qcx978132955
> 一切随心
>


Re: Are savepoints / checkpoints co-ordinated?

2018-10-12 Thread Congxian Qiu
AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->
trigger a savepoint, then cancel your job. there will no more checkpoints.

 于2018年10月12日周五 上午1:30写道:

> Hi,
>
>
>
> I had a couple questions about savepoints / checkpoints
>
>
>
> When I issue "Cancel Job with Savepoint", how is that instruction
> co-ordinated with check points? Am I certain the savepoint will be the last
> operation (i.e. no more check points)?
>
>
>
> I have a kafka src>operation>kafka sink task in flink. And it looks like
> on restart from the savepoint there are duplicates written to the sink
> topic in kafka. The dupes overlap with the last few events prior to save
> point, and I am trying to work out what could have happened.
>
> My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but
> env.enableCheckpointing(parameters.getInt("checkpoint.interval"),
> CheckpointingMode.EXACTLY_ONCE).
>
> I thought at least once still implies flushes to kafka still only occur
> with a checkpoint.
>
>
>
> One  theory is a further checkpoint occurred after/ during the savepoint -
> which would have flushed events to kafka that are not in my savepoint.
>
>
>
> Any pointers to schoolboy errors I may have made would be appreciated.
>
>
>
> -
>
> Also  am I right in thinking if I have managed state with rocksdb back end
> that is using 1G on disk, but substantially less keyed state in memory, a
> savepoint needs to save the full 1G to complete?
>
>
>
> Regards
>
> Anand
>


-- 
Blog:http://www.klion26.com
GTalk:qcx978132955
一切随心