Re: Controlling the amount of checkpoint files

2019-06-15 Thread Boris Lublinsky
So if you have externalized checkpoints, they are never purged?
The issue is that if your state size is rather large, this seems to be the only 
option.

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jun 15, 2019, at 3:39 AM, Robert Metzger  wrote:
> 
> Hey Boris,
> 
> I think the problem is that you are using externalized checkpoints:
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> Your checkpoints are retained in both failure and cancellation cases, so the 
> checkpoint files with grow indefinitely
> 
> 
> 
> On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu  > wrote:
> Hi Boris
> For the configure you gave, you can try to reduce the parallelism of the 
> operator which contains states.
> 
> Best,
> Congxian
> 
> 
> Boris Lublinsky  > 于2019年6月10日周一 下午9:43写道:
> Here is code enabling checkpointing
> 
> // Enable checkpointing
> env.enableCheckpointing(6 )   // 1 min
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints <>", 
> true)
> env.setStateBackend(checkpointingBackend)
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
>> On Jun 10, 2019, at 1:07 AM, Congxian Qiu > > wrote:
>> 
>> Hi
>> 
>> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or 
>> increment) do you use? 
>> 
>> Best,
>> Congxian
>> 
>> 
>> Boris Lublinsky > > 于2019年6月4日周二 上午6:45写道:
>> Is there a way to limit the amount of checkpoint file?
>> The parameter that I set : state.checkpoints.num-retained: 5
>> does not seem to have any effect. Is there anything else I can set to 
>> prevent infinite growth of checkpointing info?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com 
>> https://www.lightbend.com/ 
> 



Re: Controlling the amount of checkpoint files

2019-06-15 Thread Robert Metzger
Hey Boris,

I think the problem is that you are using externalized checkpoints:

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

Your checkpoints are retained in both failure and cancellation cases, so
the checkpoint files with grow indefinitely



On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu  wrote:

> Hi Boris
> For the configure you gave, you can try to reduce the parallelism of the
> operator which contains states.
>
> Best,
> Congxian
>
>
> Boris Lublinsky  于2019年6月10日周一 下午9:43写道:
>
>> Here is code enabling checkpointing
>>
>> // Enable checkpointing
>> env.enableCheckpointing(6 )   // 1 min
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", 
>> true)
>> env.setStateBackend(checkpointingBackend)
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>> On Jun 10, 2019, at 1:07 AM, Congxian Qiu  wrote:
>>
>> Hi
>>
>> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or
>> increment) do you use?
>>
>> Best,
>> Congxian
>>
>>
>> Boris Lublinsky  于2019年6月4日周二 上午6:45写道:
>>
>>> Is there a way to limit the amount of checkpoint file?
>>> The parameter that I set : state.checkpoints.num-retained: 5
>>> does not seem to have any effect. Is there anything else I can set to
>>> prevent infinite growth of checkpointing info?
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com
>>> https://www.lightbend.com/
>>>
>>>
>>


Re: Controlling the amount of checkpoint files

2019-06-12 Thread Congxian Qiu
Hi Boris
For the configure you gave, you can try to reduce the parallelism of the
operator which contains states.

Best,
Congxian


Boris Lublinsky  于2019年6月10日周一 下午9:43写道:

> Here is code enabling checkpointing
>
> // Enable checkpointing
> env.enableCheckpointing(6 )   // 1 min
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", 
> true)
> env.setStateBackend(checkpointingBackend)
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Jun 10, 2019, at 1:07 AM, Congxian Qiu  wrote:
>
> Hi
>
> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or
> increment) do you use?
>
> Best,
> Congxian
>
>
> Boris Lublinsky  于2019年6月4日周二 上午6:45写道:
>
>> Is there a way to limit the amount of checkpoint file?
>> The parameter that I set : state.checkpoints.num-retained: 5
>> does not seem to have any effect. Is there anything else I can set to
>> prevent infinite growth of checkpointing info?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>


Controlling the amount of checkpoint files

2019-06-03 Thread Boris Lublinsky
Is there a way to limit the amount of checkpoint file?
The parameter that I set : state.checkpoints.num-retained: 5
does not seem to have any effect. Is there anything else I can set to prevent 
infinite growth of checkpointing info?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/