Re: [DISCUSS] FLIP-193: Snapshots ownership

2022-03-29 Thread Dawid Wysakowicz
e operators.
Therefore, getIntersection() is irrelevant here, because operators
might not be sharing any key groups.
(so we'll have to analyze "raw" file usage I think).

6. Enforcing re-upload by a single task and skew
If we use some greedy logic like subtask 0 always re-uploads then it
might be overloaded.
So we'll have to obtain a full list of subtasks first (then probably
choose randomly or round-robin).
However, that requires rebuilding Task snapshot, which is doable but
not trivial (which I think supports "reverse API option").

7. I think it would be helpful to list file systems / object stores
that support "fast" copy (ideally with latency numbers).

Regards,
Roman

On Mon, Nov 22, 2021 at 9:24 AM Yun Gao



 wrote:

Hi,

Very thanks Dawid for proposing the FLIP to clarify the ownership for the
states. +1 for the overall changes since it makes the behavior clear and
provide users a determined method to finally cleanup savepoints / retained 
checkpoints.

Regarding the changes to the public interface, it seems currently the changes 
are all bound
to the savepoint, but from the FLIP it seems perhaps we might also need to 
support the claim declaration
for retained checkpoints like in the cli side[1] ? If so, then might it be 
better to change the option name
from `execution.savepoint.restore-mode` to something like 
`execution.restore-mode`?

Best,
Yun


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint


--
From:Konstantin Knauf  
  

Send Time:2021 Nov. 19 (Fri.) 16:00
To:dev  
  

Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership

Hi Dawid,

Thanks for working on this FLIP. Clarifying the differences and
guarantees around savepoints and checkpoints will make it easier and safer
for users and downstream projects and platforms to work with them.

+1 to the changing the current (undefined) behavior when recovering from
retained checkpoints. Users can now choose between claiming and not
claiming, which I think will make the current mixed behavior obsolete.

Cheers,

Konstantin

On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
 
 
 
wrote:

Hi devs,

I'd like to bring up for a discussion a proposal to clean up ownership
of snapshots, both checkpoints and savepoints.

The goal here is to make it clear who is responsible for deleting
checkpoints/savepoints files and when can that be done in a safe manner.

Looking forward for your feedback!

Best,

Dawid

[1] https://cwiki.apache.org/confluence/x/bIyqCw



--

Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk

  --

Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk




OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2022-03-29 Thread Hangxiang Yu
t; duplicate API instead of re-upload. I could see this as a follow-up if it 
> becomes a bottleneck. It would be a bit invasive though, as we would have to 
> somehow keep track which files should not be reused on TMs.
>
> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
> savepoints. We were thinking of building incremental savepoints on the same 
> concept. I think delaying the completion of an independent savepoint to a 
> closer undefined future is not a nice property of savepoints.
>
> Re 4. Good point. We should make sure the first completed checkpoint has the 
> independent/full checkpoint property rather than just the first triggered.
>
> Re. 5 & 6 I need a bit more time to look into it.
>
> Best,
>
> Dawid
>
> On 22/11/2021 11:40, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for the proposal Dawid, I have some questions and remarks:
>
> 1. How will stop-with-savepoint be handled?
> Shouldn't side effects be enforced in this case? (i.e. send
> notifyCheckpointComplete)
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> Anyways, any external tool will have to poll Flink API waiting for the
> next (full) checkpoint, before deleting the retained checkpoint,
> right?
> Instead, we can provide an API which tells whether the 1st checkpoint
> is still in use (and not force re-upload it).
>
> Under the hood, it can work like this:
> - for the checkpoint Flink recovers from, remember all shared state
> handles it is adding
> - when unregistering shared state handles, remove them from the set above
> - when the set becomes empty the 1st checkpoint can be deleted externally
>
> Besides not requiring re-upload, it seems much simpler and less invasive.
> On the downside, state deletion can be delayed; but I think this is a
> reasonable trade-off.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> after a configured number of checkpoints?
> There is a high chance that after some more checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> that support "fast" copy (ideally with latency numbers).
>
> Regards,
> Roman
>
> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>   
>   
>   
>   
>   
>   
>   
>  wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained 
> checkpoints.
>
> Regarding the changes to the public interface, it seems currently the changes 
> are all bound
> to the savepoint, but from the FLIP it seems perhaps we might also need to 
> support the claim declaration
> for retained checkpoints like in the cli side[1] ? If so, then might it be 
> better to change the option name
> from `execution.savepoint.restore-mode` to something like 
> `execution.restore-mode`?
>
> Best,
> Yun
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
> --
> From:Konstantin Knauf   
>
>
>
>
>  
> Send Time:2021 Nov. 19 (Fri.) 16:00
> To:dev
>
>
>
>
> 
> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>
> Hi Dawid,
>
> Thanks for working on this FLIP. Clarifying the differences and
> guarantees around savepoints and checkpoints will make it easier and safer
> for users and downstream projects and platforms to work with them.
>
> +1 to the changing the current (undefined) behavior when recovering from
> retained checkpoints. Users can now choose between claiming and not
> claiming, which I think will make the current mixed behavior obsolete.
>
> Cheers,
>
> Konstantin
>
> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
>
>
>
>
>   
> wrote:
>
> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>
> --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>  --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Dawid Wysakowicz
 files once the
>> job is running.
>> The new checkpoint must be formed by re-uploaded old artifacts AND
>> uploaded new artifacts.
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz 
>>
>>
>>  wrote:
>>
>> @Yun
>>
>> I think it is a good comment with I agree in principal. However, we use 
>> --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings 
>> for both restoring from a savepoint and an externalized checkpoint already. 
>> I wanted to voice that concern. Nevertheless I am fine with changing it to 
>> execution.restore-mode, if there are no other comments on that matter, I 
>> will change it.
>>
>> @Roman:
>>
>> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that 
>> to the doc.
>>
>> Re.2 What I don't like about this counter proposal is that it still has no 
>> clearly defined point in time when it is safe to delete the original 
>> checkpoint. Users would have a hard time reasoning about it and debugging. 
>> Even worse, I think worst case it might never happen that all the original 
>> files are no longer in use (I am not too familiar with RocksDB compaction, 
>> but what happens if there are key ranges that are never accessed again?) I 
>> agree it is unlikely, but possible, isn't it? Definitely it can take a 
>> significant time and many checkpoints to do so.
>>
>> Re. 3 I believe where you are coming from is that you'd like to keep the 
>> checkpointing time minimal and reuploading files may increase it. The 
>> proposal so far builds on the assumption we could in most cases use a cheap 
>> duplicate API instead of re-upload. I could see this as a follow-up if it 
>> becomes a bottleneck. It would be a bit invasive though, as we would have to 
>> somehow keep track which files should not be reused on TMs.
>>
>> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
>> savepoints. We were thinking of building incremental savepoints on the same 
>> concept. I think delaying the completion of an independent savepoint to a 
>> closer undefined future is not a nice property of savepoints.
>>
>> Re 4. Good point. We should make sure the first completed checkpoint has the 
>> independent/full checkpoint property rather than just the first triggered.
>>
>> Re. 5 & 6 I need a bit more time to look into it.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for the proposal Dawid, I have some questions and remarks:
>>
>> 1. How will stop-with-savepoint be handled?
>> Shouldn't side effects be enforced in this case? (i.e. send
>> notifyCheckpointComplete)
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> Anyways, any external tool will have to poll Flink API waiting for the
>> next (full) checkpoint, before deleting the retained checkpoint,
>> right?
>> Instead, we can provide an API which tells whether the 1st checkpoint
>> is still in use (and not force re-upload it).
>>
>> Under the hood, it can work like this:
>> - for the checkpoint Flink recovers from, remember all shared state
>> handles it is adding
>> - when unregistering shared state handles, remove them from the set above
>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>
>> Besides not requiring re-upload, it seems much simpler and less invasive.
>> On the downside, state deletion can be delayed; but I think this is a
>> reasonable trade-off.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>> after a configured number of checkpoints?
>> There is a high chance that after some more checkpoints, initial state
>> will not be used (because of compaction),
>> so backends won't have to re-upload anything (or small part).
>>
>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>> This should be addressed in 
>> https://issues.apache.org/jira/browse/FLINK-24611.
>> If not, I think the FLIP should consider this case.
>>
>> 5. Enforcing re-upload by a single task and Changelog state backend
>> With Changelog state backend, a file can be shared by multiple operators.
>> Therefore, getIntersection() is irrelevant here, because operators
>> might not be sharing any key groups.
>> (so we'll have to analyze "raw" file usage I think).
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> 7. I think it would be helpful to list file systems / object stores
>> that support "fast" copy (ideally with latency numbers).
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>>   
>>   
>>   
>>  wrote:
>>
>> Hi,
>>
>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>> states. +1 for the overall changes since it makes the behavior clear and
>> provide users a determined method to finally cleanup savepoints / retained 
>> checkpoints.
>>
>> Regarding the changes to the public interface, it seems currently the 
>> changes are all bound
>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>> support the claim declaration
>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>> better to change the option name
>> from `execution.savepoint.restore-mode` to something like 
>> `execution.restore-mode`?
>>
>> Best,
>> Yun
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>
>>
>> --
>> From:Konstantin Knauf   
>>
>>   
>> Send Time:2021 Nov. 19 (Fri.) 16:00
>> To:dev
>>
>>  
>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>
>> Hi Dawid,
>>
>> Thanks for working on this FLIP. Clarifying the differences and
>> guarantees around savepoints and checkpoints will make it easier and safer
>> for users and downstream projects and platforms to work with them.
>>
>> +1 to the changing the current (undefined) behavior when recovering from
>> retained checkpoints. Users can now choose between claiming and not
>> claiming, which I think will make the current mixed behavior obsolete.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
>>
>>
>> 
>> wrote:
>>
>> Hi devs,
>>
>> I'd like to bring up for a discussion a proposal to clean up ownership
>> of snapshots, both checkpoints and savepoints.
>>
>> The goal here is to make it clear who is responsible for deleting
>> checkpoints/savepoints files and when can that be done in a safe manner.
>>
>> Looking forward for your feedback!
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>
>>
>>
>> --
>>
>> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>>
>>  --
>>
>> Konstantin Knauf
>> https://twitter.com/snntrable
>> https://github.com/knaufk
>>
>>


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Till Rohrmann
Complete)
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> Anyways, any external tool will have to poll Flink API waiting for the
> next (full) checkpoint, before deleting the retained checkpoint,
> right?
> Instead, we can provide an API which tells whether the 1st checkpoint
> is still in use (and not force re-upload it).
>
> Under the hood, it can work like this:
> - for the checkpoint Flink recovers from, remember all shared state
> handles it is adding
> - when unregistering shared state handles, remove them from the set above
> - when the set becomes empty the 1st checkpoint can be deleted externally
>
> Besides not requiring re-upload, it seems much simpler and less invasive.
> On the downside, state deletion can be delayed; but I think this is a
> reasonable trade-off.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> after a configured number of checkpoints?
> There is a high chance that after some more checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> that support "fast" copy (ideally with latency numbers).
>
> Regards,
> Roman
>
> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>   
>   
>   
>  wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained 
> checkpoints.
>
> Regarding the changes to the public interface, it seems currently the changes 
> are all bound
> to the savepoint, but from the FLIP it seems perhaps we might also need to 
> support the claim declaration
> for retained checkpoints like in the cli side[1] ? If so, then might it be 
> better to change the option name
> from `execution.savepoint.restore-mode` to something like 
> `execution.restore-mode`?
>
> Best,
> Yun
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
> --
> From:Konstantin Knauf   
>
>   
> Send Time:2021 Nov. 19 (Fri.) 16:00
> To:dev
>
>  
> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>
> Hi Dawid,
>
> Thanks for working on this FLIP. Clarifying the differences and
> guarantees around savepoints and checkpoints will make it easier and safer
> for users and downstream projects and platforms to work with them.
>
> +1 to the changing the current (undefined) behavior when recovering from
> retained checkpoints. Users can now choose between claiming and not
> claiming, which I think will make the current mixed behavior obsolete.
>
> Cheers,
>
> Konstantin
>
> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
>
>
> 
> wrote:
>
> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>
> --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>  --
>
> Konstantin Knauf
> https://twitter.com/snntrable
> https://github.com/knaufk
>
>


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Dawid Wysakowicz
ually stored on relatively cheap storage like S3, so
>>> some delay shouldn't be an issue (especially taking rounding into
>>> account); it can even be cheaper or comparable to paying for
>>> re-upload/duplicate calls.
>>>
>>> Infinite delay can be an issue though, I agree.
>>> Maybe @Yun can clarify the likelihood of never deleting some SST files
>>> by RocksDB?
>>> For the changelog backend, old files won't be used once
>>> materialization succeeds.
>>>
>>> Yes, my concern is checkpointing time, but also added complexity:
>>>
>>> It would be a bit invasive though, as we would have to somehow keep track 
>>> which files should not be reused on TMs.
>>>
>>> I think we need this anyway if we choose to re-upload files once the
>>> job is running.
>>> The new checkpoint must be formed by re-uploaded old artifacts AND
>>> uploaded new artifacts.
>>>
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz 
>>>
>>> wrote:
>>>
>>> @Yun
>>>
>>> I think it is a good comment with I agree in principal. However, we use 
>>> --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings 
>>> for both restoring from a savepoint and an externalized checkpoint already. 
>>> I wanted to voice that concern. Nevertheless I am fine with changing it to 
>>> execution.restore-mode, if there are no other comments on that matter, I 
>>> will change it.
>>>
>>> @Roman:
>>>
>>> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add 
>>> that to the doc.
>>>
>>> Re.2 What I don't like about this counter proposal is that it still has no 
>>> clearly defined point in time when it is safe to delete the original 
>>> checkpoint. Users would have a hard time reasoning about it and debugging. 
>>> Even worse, I think worst case it might never happen that all the original 
>>> files are no longer in use (I am not too familiar with RocksDB compaction, 
>>> but what happens if there are key ranges that are never accessed again?) I 
>>> agree it is unlikely, but possible, isn't it? Definitely it can take a 
>>> significant time and many checkpoints to do so.
>>>
>>> Re. 3 I believe where you are coming from is that you'd like to keep the 
>>> checkpointing time minimal and reuploading files may increase it. The 
>>> proposal so far builds on the assumption we could in most cases use a cheap 
>>> duplicate API instead of re-upload. I could see this as a follow-up if it 
>>> becomes a bottleneck. It would be a bit invasive though, as we would have 
>>> to somehow keep track which files should not be reused on TMs.
>>>
>>> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
>>> savepoints. We were thinking of building incremental savepoints on the same 
>>> concept. I think delaying the completion of an independent savepoint to a 
>>> closer undefined future is not a nice property of savepoints.
>>>
>>> Re 4. Good point. We should make sure the first completed checkpoint has 
>>> the independent/full checkpoint property rather than just the first 
>>> triggered.
>>>
>>> Re. 5 & 6 I need a bit more time to look into it.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>>
>>> Hi,
>>>
>>> Thanks for the proposal Dawid, I have some questions and remarks:
>>>
>>> 1. How will stop-with-savepoint be handled?
>>> Shouldn't side effects be enforced in this case? (i.e. send
>>> notifyCheckpointComplete)
>>>
>>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>>> Anyways, any external tool will have to poll Flink API waiting for the
>>> next (full) checkpoint, before deleting the retained checkpoint,
>>> right?
>>> Instead, we can provide an API which tells whether the 1st checkpoint
>>> is still in use (and not force re-upload it).
>>>
>>> Under the hood, it can work like this:
>>> - for the checkpoint Flink recovers from, remember all shared state
>>> handles it is adding
>>> - when unregistering shared state handles, remove them from the set above
>>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>>
>>> Besides not requiring re-upload, it seems much simpler and less invasive.
>>> On the downside, state deletion can be delayed; but I think this is a
>>> reasonable trade-off.
>>>
>>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>>> after a configured number of checkpoints?
>>> There is a high chance that after some more checkpoints, initial state
>>> will not be used (because of compaction),
>>> so backends won't have to re-upload anything (or small part).
>>>
>>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>>> This should be addressed in 
>>> https://issues.apache.org/jira/browse/FLINK-24611.
>>> If not, I think the FLIP should consider this case.
>>>
>>> 5. Enforcing re-upload by a single task and Changelog state backend
>>> With Changelog state backend, a file can be shared by multiple operators.
>>> Therefore, getIntersection() is irrelevant here, because operators
>>> might not be sharing any key groups.
>>> (so we'll have to analyze "raw" file usage I think).
>>>
>>> 6. Enforcing re-upload by a single task and skew
>>> If we use some greedy logic like subtask 0 always re-uploads then it
>>> might be overloaded.
>>> So we'll have to obtain a full list of subtasks first (then probably
>>> choose randomly or round-robin).
>>> However, that requires rebuilding Task snapshot, which is doable but
>>> not trivial (which I think supports "reverse API option").
>>>
>>> 7. I think it would be helpful to list file systems / object stores
>>> that support "fast" copy (ideally with latency numbers).
>>>
>>> Regards,
>>> Roman
>>>
>>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>>>   
>>>  wrote:
>>>
>>> Hi,
>>>
>>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>>> states. +1 for the overall changes since it makes the behavior clear and
>>> provide users a determined method to finally cleanup savepoints / retained 
>>> checkpoints.
>>>
>>> Regarding the changes to the public interface, it seems currently the 
>>> changes are all bound
>>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>>> support the claim declaration
>>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>>> better to change the option name
>>> from `execution.savepoint.restore-mode` to something like 
>>> `execution.restore-mode`?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>>
>>>
>>> --
>>> From:Konstantin Knauf   
>>>  
>>> Send Time:2021 Nov. 19 (Fri.) 16:00
>>> To:dev
>>> 
>>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>>
>>> Hi Dawid,
>>>
>>> Thanks for working on this FLIP. Clarifying the differences and
>>> guarantees around savepoints and checkpoints will make it easier and safer
>>> for users and downstream projects and platforms to work with them.
>>>
>>> +1 to the changing the current (undefined) behavior when recovering from
>>> retained checkpoints. Users can now choose between claiming and not
>>> claiming, which I think will make the current mixed behavior obsolete.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
>>>   
>>> wrote:
>>>
>>> Hi devs,
>>>
>>> I'd like to bring up for a discussion a proposal to clean up ownership
>>> of snapshots, both checkpoints and savepoints.
>>>
>>> The goal here is to make it clear who is responsible for deleting
>>> checkpoints/savepoints files and when can that be done in a safe manner.
>>>
>>> Looking forward for your feedback!
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>>
>>>
>>>
>>> --
>>>
>>> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>>>
>>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Roman Khachatryan
ost cases use a cheap 
> > duplicate API instead of re-upload. I could see this as a follow-up if it 
> > becomes a bottleneck. It would be a bit invasive though, as we would have 
> > to somehow keep track which files should not be reused on TMs.
> >
> > Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
> > savepoints. We were thinking of building incremental savepoints on the same 
> > concept. I think delaying the completion of an independent savepoint to a 
> > closer undefined future is not a nice property of savepoints.
> >
> > Re 4. Good point. We should make sure the first completed checkpoint has 
> > the independent/full checkpoint property rather than just the first 
> > triggered.
> >
> > Re. 5 & 6 I need a bit more time to look into it.
> >
> > Best,
> >
> > Dawid
> >
> > On 22/11/2021 11:40, Roman Khachatryan wrote:
> >
> > Hi,
> >
> > Thanks for the proposal Dawid, I have some questions and remarks:
> >
> > 1. How will stop-with-savepoint be handled?
> > Shouldn't side effects be enforced in this case? (i.e. send
> > notifyCheckpointComplete)
> >
> > 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> > Anyways, any external tool will have to poll Flink API waiting for the
> > next (full) checkpoint, before deleting the retained checkpoint,
> > right?
> > Instead, we can provide an API which tells whether the 1st checkpoint
> > is still in use (and not force re-upload it).
> >
> > Under the hood, it can work like this:
> > - for the checkpoint Flink recovers from, remember all shared state
> > handles it is adding
> > - when unregistering shared state handles, remove them from the set above
> > - when the set becomes empty the 1st checkpoint can be deleted externally
> >
> > Besides not requiring re-upload, it seems much simpler and less invasive.
> > On the downside, state deletion can be delayed; but I think this is a
> > reasonable trade-off.
> >
> > 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> > after a configured number of checkpoints?
> > There is a high chance that after some more checkpoints, initial state
> > will not be used (because of compaction),
> > so backends won't have to re-upload anything (or small part).
> >
> > 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> > This should be addressed in 
> > https://issues.apache.org/jira/browse/FLINK-24611.
> > If not, I think the FLIP should consider this case.
> >
> > 5. Enforcing re-upload by a single task and Changelog state backend
> > With Changelog state backend, a file can be shared by multiple operators.
> > Therefore, getIntersection() is irrelevant here, because operators
> > might not be sharing any key groups.
> > (so we'll have to analyze "raw" file usage I think).
> >
> > 6. Enforcing re-upload by a single task and skew
> > If we use some greedy logic like subtask 0 always re-uploads then it
> > might be overloaded.
> > So we'll have to obtain a full list of subtasks first (then probably
> > choose randomly or round-robin).
> > However, that requires rebuilding Task snapshot, which is doable but
> > not trivial (which I think supports "reverse API option").
> >
> > 7. I think it would be helpful to list file systems / object stores
> > that support "fast" copy (ideally with latency numbers).
> >
> > Regards,
> > Roman
> >
> > On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
> >   
> >  wrote:
> >
> > Hi,
> >
> > Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> > states. +1 for the overall changes since it makes the behavior clear and
> > provide users a determined method to finally cleanup savepoints / retained 
> > checkpoints.
> >
> > Regarding the changes to the public interface, it seems currently the 
> > changes are all bound
> > to the savepoint, but from the FLIP it seems perhaps we might also need to 
> > support the claim declaration
> > for retained checkpoints like in the cli side[1] ? If so, then might it be 
> > better to change the option name
> > from `execution.savepoint.restore-mode` to something like 
> > `execution.restore-mode`?
> >
> > Best,
> > Yun
> >
> >
> > [1] 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
> >
> >
> > --
> > From:Konstantin Knauf   
> >  
> > Send Time:2021 Nov. 19 (Fri.) 16:00
> > To:dev
> > 
> > Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
> >
> > Hi Dawid,
> >
> > Thanks for working on this FLIP. Clarifying the differences and
> > guarantees around savepoints and checkpoints will make it easier and safer
> > for users and downstream projects and platforms to work with them.
> >
> > +1 to the changing the current (undefined) behavior when recovering from
> > retained checkpoints. Users can now choose between claiming and not
> > claiming, which I think will make the current mixed behavior obsolete.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
> >   
> > wrote:
> >
> > Hi devs,
> >
> > I'd like to bring up for a discussion a proposal to clean up ownership
> > of snapshots, both checkpoints and savepoints.
> >
> > The goal here is to make it clear who is responsible for deleting
> > checkpoints/savepoints files and when can that be done in a safe manner.
> >
> > Looking forward for your feedback!
> >
> > Best,
> >
> > Dawid
> >
> > [1] https://cwiki.apache.org/confluence/x/bIyqCw
> >
> >
> >
> > --
> >
> > Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
> >
> >
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Konstantin Knauf
g.
> The new checkpoint must be formed by re-uploaded old artifacts AND
> uploaded new artifacts.
>
>
> Regards,
> Roman
>
>
> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz 
>
> wrote:
>
> @Yun
>
> I think it is a good comment with I agree in principal. However, we use 
> --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings for 
> both restoring from a savepoint and an externalized checkpoint already. I 
> wanted to voice that concern. Nevertheless I am fine with changing it to 
> execution.restore-mode, if there are no other comments on that matter, I will 
> change it.
>
> @Roman:
>
> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that 
> to the doc.
>
> Re.2 What I don't like about this counter proposal is that it still has no 
> clearly defined point in time when it is safe to delete the original 
> checkpoint. Users would have a hard time reasoning about it and debugging. 
> Even worse, I think worst case it might never happen that all the original 
> files are no longer in use (I am not too familiar with RocksDB compaction, 
> but what happens if there are key ranges that are never accessed again?) I 
> agree it is unlikely, but possible, isn't it? Definitely it can take a 
> significant time and many checkpoints to do so.
>
> Re. 3 I believe where you are coming from is that you'd like to keep the 
> checkpointing time minimal and reuploading files may increase it. The 
> proposal so far builds on the assumption we could in most cases use a cheap 
> duplicate API instead of re-upload. I could see this as a follow-up if it 
> becomes a bottleneck. It would be a bit invasive though, as we would have to 
> somehow keep track which files should not be reused on TMs.
>
> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
> savepoints. We were thinking of building incremental savepoints on the same 
> concept. I think delaying the completion of an independent savepoint to a 
> closer undefined future is not a nice property of savepoints.
>
> Re 4. Good point. We should make sure the first completed checkpoint has the 
> independent/full checkpoint property rather than just the first triggered.
>
> Re. 5 & 6 I need a bit more time to look into it.
>
> Best,
>
> Dawid
>
> On 22/11/2021 11:40, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for the proposal Dawid, I have some questions and remarks:
>
> 1. How will stop-with-savepoint be handled?
> Shouldn't side effects be enforced in this case? (i.e. send
> notifyCheckpointComplete)
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> Anyways, any external tool will have to poll Flink API waiting for the
> next (full) checkpoint, before deleting the retained checkpoint,
> right?
> Instead, we can provide an API which tells whether the 1st checkpoint
> is still in use (and not force re-upload it).
>
> Under the hood, it can work like this:
> - for the checkpoint Flink recovers from, remember all shared state
> handles it is adding
> - when unregistering shared state handles, remove them from the set above
> - when the set becomes empty the 1st checkpoint can be deleted externally
>
> Besides not requiring re-upload, it seems much simpler and less invasive.
> On the downside, state deletion can be delayed; but I think this is a
> reasonable trade-off.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> after a configured number of checkpoints?
> There is a high chance that after some more checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> t

Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-26 Thread Dawid Wysakowicz
 /
>> checkpoint completion / api response.
>> But with the current one, there is an assumption: "initial checkpoint
>> can be deleted once a new one completes" (instead of just "initial
>> checkpoint can be deleted once the API says it can be deleted").
>> So I think it's actually more clear to offer this explicit API and rely on 
>> it.
>>
>> Regarding delaying the deletion,
>> I agree that it can delay deletion, but how important is it?
>> Checkpoints are usually stored on relatively cheap storage like S3, so
>> some delay shouldn't be an issue (especially taking rounding into
>> account); it can even be cheaper or comparable to paying for
>> re-upload/duplicate calls.
>>
>> Infinite delay can be an issue though, I agree.
>> Maybe @Yun can clarify the likelihood of never deleting some SST files
>> by RocksDB?
>> For the changelog backend, old files won't be used once
>> materialization succeeds.
>>
>> Yes, my concern is checkpointing time, but also added complexity:
>>
>> It would be a bit invasive though, as we would have to somehow keep track 
>> which files should not be reused on TMs.
>>
>> I think we need this anyway if we choose to re-upload files once the
>> job is running.
>> The new checkpoint must be formed by re-uploaded old artifacts AND
>> uploaded new artifacts.
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz 
>>  wrote:
>>
>> @Yun
>>
>> I think it is a good comment with I agree in principal. However, we use 
>> --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings 
>> for both restoring from a savepoint and an externalized checkpoint already. 
>> I wanted to voice that concern. Nevertheless I am fine with changing it to 
>> execution.restore-mode, if there are no other comments on that matter, I 
>> will change it.
>>
>> @Roman:
>>
>> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that 
>> to the doc.
>>
>> Re.2 What I don't like about this counter proposal is that it still has no 
>> clearly defined point in time when it is safe to delete the original 
>> checkpoint. Users would have a hard time reasoning about it and debugging. 
>> Even worse, I think worst case it might never happen that all the original 
>> files are no longer in use (I am not too familiar with RocksDB compaction, 
>> but what happens if there are key ranges that are never accessed again?) I 
>> agree it is unlikely, but possible, isn't it? Definitely it can take a 
>> significant time and many checkpoints to do so.
>>
>> Re. 3 I believe where you are coming from is that you'd like to keep the 
>> checkpointing time minimal and reuploading files may increase it. The 
>> proposal so far builds on the assumption we could in most cases use a cheap 
>> duplicate API instead of re-upload. I could see this as a follow-up if it 
>> becomes a bottleneck. It would be a bit invasive though, as we would have to 
>> somehow keep track which files should not be reused on TMs.
>>
>> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
>> savepoints. We were thinking of building incremental savepoints on the same 
>> concept. I think delaying the completion of an independent savepoint to a 
>> closer undefined future is not a nice property of savepoints.
>>
>> Re 4. Good point. We should make sure the first completed checkpoint has the 
>> independent/full checkpoint property rather than just the first triggered.
>>
>> Re. 5 & 6 I need a bit more time to look into it.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for the proposal Dawid, I have some questions and remarks:
>>
>> 1. How will stop-with-savepoint be handled?
>> Shouldn't side effects be enforced in this case? (i.e. send
>> notifyCheckpointComplete)
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> Anyways, any external tool will have to poll Flink API waiting for the
>> next (full) checkpoint, before deleting the retained checkpoint,
>> right?
>> Instead, we can provide an API which tells whether the 1st checkpoint
>> is still in use (and not force re-upload it).
>>
>> Under the hood, it can work like this:
>> - for the checkpoint Flink recovers from, remember all shared state
>> handles it is adding
>> - when unregistering shared sta

Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-23 Thread Roman Khachatryan
n independent savepoint to a 
> closer undefined future is not a nice property of savepoints.
>
> Re 4. Good point. We should make sure the first completed checkpoint has the 
> independent/full checkpoint property rather than just the first triggered.
>
> Re. 5 & 6 I need a bit more time to look into it.
>
> Best,
>
> Dawid
>
> On 22/11/2021 11:40, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for the proposal Dawid, I have some questions and remarks:
>
> 1. How will stop-with-savepoint be handled?
> Shouldn't side effects be enforced in this case? (i.e. send
> notifyCheckpointComplete)
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> Anyways, any external tool will have to poll Flink API waiting for the
> next (full) checkpoint, before deleting the retained checkpoint,
> right?
> Instead, we can provide an API which tells whether the 1st checkpoint
> is still in use (and not force re-upload it).
>
> Under the hood, it can work like this:
> - for the checkpoint Flink recovers from, remember all shared state
> handles it is adding
> - when unregistering shared state handles, remove them from the set above
> - when the set becomes empty the 1st checkpoint can be deleted externally
>
> Besides not requiring re-upload, it seems much simpler and less invasive.
> On the downside, state deletion can be delayed; but I think this is a
> reasonable trade-off.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> after a configured number of checkpoints?
> There is a high chance that after some more checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> that support "fast" copy (ideally with latency numbers).
>
> Regards,
> Roman
>
> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>  wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained 
> checkpoints.
>
> Regarding the changes to the public interface, it seems currently the changes 
> are all bound
> to the savepoint, but from the FLIP it seems perhaps we might also need to 
> support the claim declaration
> for retained checkpoints like in the cli side[1] ? If so, then might it be 
> better to change the option name
> from `execution.savepoint.restore-mode` to something like 
> `execution.restore-mode`?
>
> Best,
> Yun
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
> --
> From:Konstantin Knauf  
> Send Time:2021 Nov. 19 (Fri.) 16:00
> To:dev  
> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>
> Hi Dawid,
>
> Thanks for working on this FLIP. Clarifying the differences and
> guarantees around savepoints and checkpoints will make it easier and safer
> for users and downstream projects and platforms to work with them.
>
> +1 to the changing the current (undefined) behavior when recovering from
> retained checkpoints. Users can now choose between claiming and not
> claiming, which I think will make the current mixed behavior obsolete.
>
> Cheers,
>
> Konstantin
>
> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
> 
> wrote:
>
> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>
> --
>
> Konstantin Knauf
> https://twitter.com/snntrable
> https://github.com/knaufk
>
>


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-23 Thread Dawid Wysakowicz
need a bit more time to look into it.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>>
>>> Hi,
>>>
>>> Thanks for the proposal Dawid, I have some questions and remarks:
>>>
>>> 1. How will stop-with-savepoint be handled?
>>> Shouldn't side effects be enforced in this case? (i.e. send
>>> notifyCheckpointComplete)
>>>
>>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>>> Anyways, any external tool will have to poll Flink API waiting for the
>>> next (full) checkpoint, before deleting the retained checkpoint,
>>> right?
>>> Instead, we can provide an API which tells whether the 1st checkpoint
>>> is still in use (and not force re-upload it).
>>>
>>> Under the hood, it can work like this:
>>> - for the checkpoint Flink recovers from, remember all shared state
>>> handles it is adding
>>> - when unregistering shared state handles, remove them from the set above
>>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>>
>>> Besides not requiring re-upload, it seems much simpler and less invasive.
>>> On the downside, state deletion can be delayed; but I think this is a
>>> reasonable trade-off.
>>>
>>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>>> after a configured number of checkpoints?
>>> There is a high chance that after some more checkpoints, initial state
>>> will not be used (because of compaction),
>>> so backends won't have to re-upload anything (or small part).
>>>
>>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>>> This should be addressed in 
>>> https://issues.apache.org/jira/browse/FLINK-24611.
>>> If not, I think the FLIP should consider this case.
>>>
>>> 5. Enforcing re-upload by a single task and Changelog state backend
>>> With Changelog state backend, a file can be shared by multiple operators.
>>> Therefore, getIntersection() is irrelevant here, because operators
>>> might not be sharing any key groups.
>>> (so we'll have to analyze "raw" file usage I think).
>>>
>>> 6. Enforcing re-upload by a single task and skew
>>> If we use some greedy logic like subtask 0 always re-uploads then it
>>> might be overloaded.
>>> So we'll have to obtain a full list of subtasks first (then probably
>>> choose randomly or round-robin).
>>> However, that requires rebuilding Task snapshot, which is doable but
>>> not trivial (which I think supports "reverse API option").
>>>
>>> 7. I think it would be helpful to list file systems / object stores
>>> that support "fast" copy (ideally with latency numbers).
>>>
>>> Regards,
>>> Roman
>>>
>>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>>>  wrote:
>>>
>>> Hi,
>>>
>>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>>> states. +1 for the overall changes since it makes the behavior clear and
>>> provide users a determined method to finally cleanup savepoints / retained 
>>> checkpoints.
>>>
>>> Regarding the changes to the public interface, it seems currently the 
>>> changes are all bound
>>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>>> support the claim declaration
>>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>>> better to change the option name
>>> from `execution.savepoint.restore-mode` to something like 
>>> `execution.restore-mode`?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>>
>>>
>>> --
>>> From:Konstantin Knauf  
>>> Send Time:2021 Nov. 19 (Fri.) 16:00
>>> To:dev  
>>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>>
>>> Hi Dawid,
>>>
>>> Thanks for working on this FLIP. Clarifying the differences and
>>> guarantees around savepoints and checkpoints will make it easier and safer
>>> for users and downstream projects and platforms to work with them.
>>>
>>> +1 to the changing the current (undefined) behavior when recovering from
>>> retained checkpoints. Users can now choose between claiming and not
>>> claiming, which I think will make the current mixed behavior obsolete.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
>>> 
>>> wrote:
>>>
>>> Hi devs,
>>>
>>> I'd like to bring up for a discussion a proposal to clean up ownership
>>> of snapshots, both checkpoints and savepoints.
>>>
>>> The goal here is to make it clear who is responsible for deleting
>>> checkpoints/savepoints files and when can that be done in a safe manner.
>>>
>>> Looking forward for your feedback!
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>>
>>>
>>>
>>> --
>>>
>>> Konstantin Knauf
>>> https://twitter.com/snntrable
>>> https://github.com/knaufk
>>>
>>>


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-23 Thread Khachatryan Roman
ieve where you are coming from is that you'd like to keep the 
> > checkpointing time minimal and reuploading files may increase it. The 
> > proposal so far builds on the assumption we could in most cases use a cheap 
> > duplicate API instead of re-upload. I could see this as a follow-up if it 
> > becomes a bottleneck. It would be a bit invasive though, as we would have 
> > to somehow keep track which files should not be reused on TMs.
> >
> > Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
> > savepoints. We were thinking of building incremental savepoints on the same 
> > concept. I think delaying the completion of an independent savepoint to a 
> > closer undefined future is not a nice property of savepoints.
> >
> > Re 4. Good point. We should make sure the first completed checkpoint has 
> > the independent/full checkpoint property rather than just the first 
> > triggered.
> >
> > Re. 5 & 6 I need a bit more time to look into it.
> >
> > Best,
> >
> > Dawid
> >
> > On 22/11/2021 11:40, Roman Khachatryan wrote:
> >
> > Hi,
> >
> > Thanks for the proposal Dawid, I have some questions and remarks:
> >
> > 1. How will stop-with-savepoint be handled?
> > Shouldn't side effects be enforced in this case? (i.e. send
> > notifyCheckpointComplete)
> >
> > 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> > Anyways, any external tool will have to poll Flink API waiting for the
> > next (full) checkpoint, before deleting the retained checkpoint,
> > right?
> > Instead, we can provide an API which tells whether the 1st checkpoint
> > is still in use (and not force re-upload it).
> >
> > Under the hood, it can work like this:
> > - for the checkpoint Flink recovers from, remember all shared state
> > handles it is adding
> > - when unregistering shared state handles, remove them from the set above
> > - when the set becomes empty the 1st checkpoint can be deleted externally
> >
> > Besides not requiring re-upload, it seems much simpler and less invasive.
> > On the downside, state deletion can be delayed; but I think this is a
> > reasonable trade-off.
> >
> > 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> > after a configured number of checkpoints?
> > There is a high chance that after some more checkpoints, initial state
> > will not be used (because of compaction),
> > so backends won't have to re-upload anything (or small part).
> >
> > 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> > This should be addressed in 
> > https://issues.apache.org/jira/browse/FLINK-24611.
> > If not, I think the FLIP should consider this case.
> >
> > 5. Enforcing re-upload by a single task and Changelog state backend
> > With Changelog state backend, a file can be shared by multiple operators.
> > Therefore, getIntersection() is irrelevant here, because operators
> > might not be sharing any key groups.
> > (so we'll have to analyze "raw" file usage I think).
> >
> > 6. Enforcing re-upload by a single task and skew
> > If we use some greedy logic like subtask 0 always re-uploads then it
> > might be overloaded.
> > So we'll have to obtain a full list of subtasks first (then probably
> > choose randomly or round-robin).
> > However, that requires rebuilding Task snapshot, which is doable but
> > not trivial (which I think supports "reverse API option").
> >
> > 7. I think it would be helpful to list file systems / object stores
> > that support "fast" copy (ideally with latency numbers).
> >
> > Regards,
> > Roman
> >
> > On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
> >  wrote:
> >
> > Hi,
> >
> > Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> > states. +1 for the overall changes since it makes the behavior clear and
> > provide users a determined method to finally cleanup savepoints / retained 
> > checkpoints.
> >
> > Regarding the changes to the public interface, it seems currently the 
> > changes are all bound
> > to the savepoint, but from the FLIP it seems perhaps we might also need to 
> > support the claim declaration
> > for retained checkpoints like in the cli side[1] ? If so, then might it be 
> > better to change the option name
> > from `execution.savepoint.restore-mode` to something like 
> > `execution.restore-mode`?
> >
> > Best,
> > Yun
> >
> >
> > [1] 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
> >
> >
> > --
> > From:Konstantin Knauf  
> > Send Time:2021 Nov. 19 (Fri.) 16:00
> > To:dev  
> > Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
> >
> > Hi Dawid,
> >
> > Thanks for working on this FLIP. Clarifying the differences and
> > guarantees around savepoints and checkpoints will make it easier and safer
> > for users and downstream projects and platforms to work with them.
> >
> > +1 to the changing the current (undefined) behavior when recovering from
> > retained checkpoints. Users can now choose between claiming and not
> > claiming, which I think will make the current mixed behavior obsolete.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz  
> > 
> > wrote:
> >
> > Hi devs,
> >
> > I'd like to bring up for a discussion a proposal to clean up ownership
> > of snapshots, both checkpoints and savepoints.
> >
> > The goal here is to make it clear who is responsible for deleting
> > checkpoints/savepoints files and when can that be done in a safe manner.
> >
> > Looking forward for your feedback!
> >
> > Best,
> >
> > Dawid
> >
> > [1] https://cwiki.apache.org/confluence/x/bIyqCw
> >
> >
> >
> > --
> >
> > Konstantin Knauf
> > https://twitter.com/snntrable
> > https://github.com/knaufk
> >
> >


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-23 Thread Piotr Nowojski
ring from a savepoint and an externalized checkpoint already. I 
> wanted to voice that concern. Nevertheless I am fine with changing it to 
> execution.restore-mode, if there are no other comments on that matter, I will 
> change it.
>
> @Roman:
>
> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that 
> to the doc.
>
> Re.2 What I don't like about this counter proposal is that it still has no 
> clearly defined point in time when it is safe to delete the original 
> checkpoint. Users would have a hard time reasoning about it and debugging. 
> Even worse, I think worst case it might never happen that all the original 
> files are no longer in use (I am not too familiar with RocksDB compaction, 
> but what happens if there are key ranges that are never accessed again?) I 
> agree it is unlikely, but possible, isn't it? Definitely it can take a 
> significant time and many checkpoints to do so.
>
> Re. 3 I believe where you are coming from is that you'd like to keep the 
> checkpointing time minimal and reuploading files may increase it. The 
> proposal so far builds on the assumption we could in most cases use a cheap 
> duplicate API instead of re-upload. I could see this as a follow-up if it 
> becomes a bottleneck. It would be a bit invasive though, as we would have to 
> somehow keep track which files should not be reused on TMs.
>
> Re. 2 & 3 Neither of the counter proposals work well for taking incremental 
> savepoints. We were thinking of building incremental savepoints on the same 
> concept. I think delaying the completion of an independent savepoint to a 
> closer undefined future is not a nice property of savepoints.
>
> Re 4. Good point. We should make sure the first completed checkpoint has the 
> independent/full checkpoint property rather than just the first triggered.
>
> Re. 5 & 6 I need a bit more time to look into it.
>
> Best,
>
> Dawid
>
> On 22/11/2021 11:40, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for the proposal Dawid, I have some questions and remarks:
>
> 1. How will stop-with-savepoint be handled?
> Shouldn't side effects be enforced in this case? (i.e. send
> notifyCheckpointComplete)
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> Anyways, any external tool will have to poll Flink API waiting for the
> next (full) checkpoint, before deleting the retained checkpoint,
> right?
> Instead, we can provide an API which tells whether the 1st checkpoint
> is still in use (and not force re-upload it).
>
> Under the hood, it can work like this:
> - for the checkpoint Flink recovers from, remember all shared state
> handles it is adding
> - when unregistering shared state handles, remove them from the set above
> - when the set becomes empty the 1st checkpoint can be deleted externally
>
> Besides not requiring re-upload, it seems much simpler and less invasive.
> On the downside, state deletion can be delayed; but I think this is a
> reasonable trade-off.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> after a configured number of checkpoints?
> There is a high chance that after some more checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> that support "fast" copy (ideally with latency numbers).
>
> Regards,
> Roman
>
> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>  wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained 
> che

Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-23 Thread Dawid Wysakowicz
p if it becomes a bottleneck. It would be a bit invasive though, 
>>>>>> as we would have to somehow keep track which files should not be reused 
>>>>>> on TMs.
>>>>>>
>>>>>> Re. 2 & 3 Neither of the counter proposals work well for taking 
>>>>>> incremental savepoints. We were thinking of building incremental 
>>>>>> savepoints on the same concept. I think delaying the completion of an 
>>>>>> independent savepoint to a closer undefined future is not a nice 
>>>>>> property of savepoints.
>>>>>>
>>>>>> Re 4. Good point. We should make sure the first completed checkpoint has 
>>>>>> the independent/full checkpoint property rather than just the first 
>>>>>> triggered.
>>>>>>
>>>>>> Re. 5 & 6 I need a bit more time to look into it.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the proposal Dawid, I have some questions and remarks:
>>>>>>
>>>>>> 1. How will stop-with-savepoint be handled?
>>>>>> Shouldn't side effects be enforced in this case? (i.e. send
>>>>>> notifyCheckpointComplete)
>>>>>>
>>>>>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim 
>>>>>> mode?
>>>>>> Anyways, any external tool will have to poll Flink API waiting for the
>>>>>> next (full) checkpoint, before deleting the retained checkpoint,
>>>>>> right?
>>>>>> Instead, we can provide an API which tells whether the 1st checkpoint
>>>>>> is still in use (and not force re-upload it).
>>>>>>
>>>>>> Under the hood, it can work like this:
>>>>>> - for the checkpoint Flink recovers from, remember all shared state
>>>>>> handles it is adding
>>>>>> - when unregistering shared state handles, remove them from the set above
>>>>>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>>>>>
>>>>>> Besides not requiring re-upload, it seems much simpler and less invasive.
>>>>>> On the downside, state deletion can be delayed; but I think this is a
>>>>>> reasonable trade-off.
>>>>>>
>>>>>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>>>>>> after a configured number of checkpoints?
>>>>>> There is a high chance that after some more checkpoints, initial state
>>>>>> will not be used (because of compaction),
>>>>>> so backends won't have to re-upload anything (or small part).
>>>>>>
>>>>>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>>>>>> This should be addressed in 
>>>>>> https://issues.apache.org/jira/browse/FLINK-24611.
>>>>>> If not, I think the FLIP should consider this case.
>>>>>>
>>>>>> 5. Enforcing re-upload by a single task and Changelog state backend
>>>>>> With Changelog state backend, a file can be shared by multiple operators.
>>>>>> Therefore, getIntersection() is irrelevant here, because operators
>>>>>> might not be sharing any key groups.
>>>>>> (so we'll have to analyze "raw" file usage I think).
>>>>>>
>>>>>> 6. Enforcing re-upload by a single task and skew
>>>>>> If we use some greedy logic like subtask 0 always re-uploads then it
>>>>>> might be overloaded.
>>>>>> So we'll have to obtain a full list of subtasks first (then probably
>>>>>> choose randomly or round-robin).
>>>>>> However, that requires rebuilding Task snapshot, which is doable but
>>>>>> not trivial (which I think supports "reverse API option").
>>>>>>
>>>>>> 7. I think it would be helpful to list file systems / object stores
>>>>>> that support "fast" copy (ideally with latency numbers).
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>>>>>> states. +1 for the overall changes since it makes the behavior clear and
>>>>>> provide users a determined method to finally cleanup savepoints / 
>>>>>> retained checkpoints.
>>>>>>
>>>>>> Regarding the changes to the public interface, it seems currently the 
>>>>>> changes are all bound
>>>>>> to the savepoint, but from the FLIP it seems perhaps we might also need 
>>>>>> to support the claim declaration
>>>>>> for retained checkpoints like in the cli side[1] ? If so, then might it 
>>>>>> be better to change the option name
>>>>>> from `execution.savepoint.restore-mode` to something like 
>>>>>> `execution.restore-mode`?
>>>>>>
>>>>>> Best,
>>>>>> Yun
>>>>>>
>>>>>>
>>>>>> [1] 
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>>>>>
>>>>>>
>>>>>> --
>>>>>> From:Konstantin Knauf 
>>>>>> Send Time:2021 Nov. 19 (Fri.) 16:00
>>>>>> To:dev 
>>>>>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>>>>>
>>>>>> Hi Dawid,
>>>>>>
>>>>>> Thanks for working on this FLIP. Clarifying the differences and
>>>>>> guarantees around savepoints and checkpoints will make it easier and 
>>>>>> safer
>>>>>> for users and downstream projects and platforms to work with them.
>>>>>>
>>>>>> +1 to the changing the current (undefined) behavior when recovering from
>>>>>> retained checkpoints. Users can now choose between claiming and not
>>>>>> claiming, which I think will make the current mixed behavior obsolete.
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
>>>>>> wrote:
>>>>>>
>>>>>> Hi devs,
>>>>>>
>>>>>> I'd like to bring up for a discussion a proposal to clean up ownership
>>>>>> of snapshots, both checkpoints and savepoints.
>>>>>>
>>>>>> The goal here is to make it clear who is responsible for deleting
>>>>>> checkpoints/savepoints files and when can that be done in a safe manner.
>>>>>>
>>>>>> Looking forward for your feedback!
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Konstantin Knauf
>>>>>>
>>>>>> https://twitter.com/snntrable
>>>>>>
>>>>>> https://github.com/knaufk
>>>>>>


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Yun Tang
;> Under the hood, it can work like this:
> >>>> - for the checkpoint Flink recovers from, remember all shared state
> >>>> handles it is adding
> >>>> - when unregistering shared state handles, remove them from the set above
> >>>> - when the set becomes empty the 1st checkpoint can be deleted externally
> >>>>
> >>>> Besides not requiring re-upload, it seems much simpler and less invasive.
> >>>> On the downside, state deletion can be delayed; but I think this is a
> >>>> reasonable trade-off.
> >>>>
> >>>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> >>>> after a configured number of checkpoints?
> >>>> There is a high chance that after some more checkpoints, initial state
> >>>> will not be used (because of compaction),
> >>>> so backends won't have to re-upload anything (or small part).
> >>>>
> >>>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> >>>> This should be addressed in 
> >>>> https://issues.apache.org/jira/browse/FLINK-24611.
> >>>> If not, I think the FLIP should consider this case.
> >>>>
> >>>> 5. Enforcing re-upload by a single task and Changelog state backend
> >>>> With Changelog state backend, a file can be shared by multiple operators.
> >>>> Therefore, getIntersection() is irrelevant here, because operators
> >>>> might not be sharing any key groups.
> >>>> (so we'll have to analyze "raw" file usage I think).
> >>>>
> >>>> 6. Enforcing re-upload by a single task and skew
> >>>> If we use some greedy logic like subtask 0 always re-uploads then it
> >>>> might be overloaded.
> >>>> So we'll have to obtain a full list of subtasks first (then probably
> >>>> choose randomly or round-robin).
> >>>> However, that requires rebuilding Task snapshot, which is doable but
> >>>> not trivial (which I think supports "reverse API option").
> >>>>
> >>>> 7. I think it would be helpful to list file systems / object stores
> >>>> that support "fast" copy (ideally with latency numbers).
> >>>>
> >>>> Regards,
> >>>> Roman
> >>>>
> >>>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
> >>>> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> >>>> states. +1 for the overall changes since it makes the behavior clear and
> >>>> provide users a determined method to finally cleanup savepoints / 
> >>>> retained checkpoints.
> >>>>
> >>>> Regarding the changes to the public interface, it seems currently the 
> >>>> changes are all bound
> >>>> to the savepoint, but from the FLIP it seems perhaps we might also need 
> >>>> to support the claim declaration
> >>>> for retained checkpoints like in the cli side[1] ? If so, then might it 
> >>>> be better to change the option name
> >>>> from `execution.savepoint.restore-mode` to something like 
> >>>> `execution.restore-mode`?
> >>>>
> >>>> Best,
> >>>> Yun
> >>>>
> >>>>
> >>>> [1] 
> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
> >>>>
> >>>>
> >>>> --
> >>>> From:Konstantin Knauf 
> >>>> Send Time:2021 Nov. 19 (Fri.) 16:00
> >>>> To:dev 
> >>>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
> >>>>
> >>>> Hi Dawid,
> >>>>
> >>>> Thanks for working on this FLIP. Clarifying the differences and
> >>>> guarantees around savepoints and checkpoints will make it easier and 
> >>>> safer
> >>>> for users and downstream projects and platforms to work with them.
> >>>>
> >>>> +1 to the changing the current (undefined) behavior when recovering from
> >>>> retained checkpoints. Users can now choose between claiming and not
> >>>> claiming, which I think will make the current mixed behavior obsolete.
> >>>>
> >>>> Cheers,
> >>>>
> >>>> Konstantin
> >>>>
> >>>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
> >>>> wrote:
> >>>>
> >>>> Hi devs,
> >>>>
> >>>> I'd like to bring up for a discussion a proposal to clean up ownership
> >>>> of snapshots, both checkpoints and savepoints.
> >>>>
> >>>> The goal here is to make it clear who is responsible for deleting
> >>>> checkpoints/savepoints files and when can that be done in a safe manner.
> >>>>
> >>>> Looking forward for your feedback!
> >>>>
> >>>> Best,
> >>>>
> >>>> Dawid
> >>>>
> >>>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>> Konstantin Knauf
> >>>>
> >>>> https://twitter.com/snntrable
> >>>>
> >>>> https://github.com/knaufk
> >>>>
> 


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Dawid Wysakowicz
t;>> cheap duplicate API instead of re-upload. I could see this as a follow-up 
>>>> if it becomes a bottleneck. It would be a bit invasive though, as we would 
>>>> have to somehow keep track which files should not be reused on TMs.
>>>>
>>>> Re. 2 & 3 Neither of the counter proposals work well for taking 
>>>> incremental savepoints. We were thinking of building incremental 
>>>> savepoints on the same concept. I think delaying the completion of an 
>>>> independent savepoint to a closer undefined future is not a nice property 
>>>> of savepoints.
>>>>
>>>> Re 4. Good point. We should make sure the first completed checkpoint has 
>>>> the independent/full checkpoint property rather than just the first 
>>>> triggered.
>>>>
>>>> Re. 5 & 6 I need a bit more time to look into it.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>>>
>>>> Hi,
>>>>
>>>> Thanks for the proposal Dawid, I have some questions and remarks:
>>>>
>>>> 1. How will stop-with-savepoint be handled?
>>>> Shouldn't side effects be enforced in this case? (i.e. send
>>>> notifyCheckpointComplete)
>>>>
>>>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>>>> Anyways, any external tool will have to poll Flink API waiting for the
>>>> next (full) checkpoint, before deleting the retained checkpoint,
>>>> right?
>>>> Instead, we can provide an API which tells whether the 1st checkpoint
>>>> is still in use (and not force re-upload it).
>>>>
>>>> Under the hood, it can work like this:
>>>> - for the checkpoint Flink recovers from, remember all shared state
>>>> handles it is adding
>>>> - when unregistering shared state handles, remove them from the set above
>>>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>>>
>>>> Besides not requiring re-upload, it seems much simpler and less invasive.
>>>> On the downside, state deletion can be delayed; but I think this is a
>>>> reasonable trade-off.
>>>>
>>>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>>>> after a configured number of checkpoints?
>>>> There is a high chance that after some more checkpoints, initial state
>>>> will not be used (because of compaction),
>>>> so backends won't have to re-upload anything (or small part).
>>>>
>>>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>>>> This should be addressed in 
>>>> https://issues.apache.org/jira/browse/FLINK-24611.
>>>> If not, I think the FLIP should consider this case.
>>>>
>>>> 5. Enforcing re-upload by a single task and Changelog state backend
>>>> With Changelog state backend, a file can be shared by multiple operators.
>>>> Therefore, getIntersection() is irrelevant here, because operators
>>>> might not be sharing any key groups.
>>>> (so we'll have to analyze "raw" file usage I think).
>>>>
>>>> 6. Enforcing re-upload by a single task and skew
>>>> If we use some greedy logic like subtask 0 always re-uploads then it
>>>> might be overloaded.
>>>> So we'll have to obtain a full list of subtasks first (then probably
>>>> choose randomly or round-robin).
>>>> However, that requires rebuilding Task snapshot, which is doable but
>>>> not trivial (which I think supports "reverse API option").
>>>>
>>>> 7. I think it would be helpful to list file systems / object stores
>>>> that support "fast" copy (ideally with latency numbers).
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>>>> states. +1 for the overall changes since it makes the behavior clear and
>>>> provide users a determined method to finally cleanup savepoints / retained 
>>>> checkpoints.
>>>>
>>>> Regarding the changes to the public interface, it seems currently the 
>>>> changes are all bound
>>>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>>>> support the claim declaration
>>>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>>>> better to change the option name
>>>> from `execution.savepoint.restore-mode` to something like 
>>>> `execution.restore-mode`?
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>>
>>>> [1] 
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>>>
>>>>
>>>> --
>>>> From:Konstantin Knauf 
>>>> Send Time:2021 Nov. 19 (Fri.) 16:00
>>>> To:dev 
>>>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>>>
>>>> Hi Dawid,
>>>>
>>>> Thanks for working on this FLIP. Clarifying the differences and
>>>> guarantees around savepoints and checkpoints will make it easier and safer
>>>> for users and downstream projects and platforms to work with them.
>>>>
>>>> +1 to the changing the current (undefined) behavior when recovering from
>>>> retained checkpoints. Users can now choose between claiming and not
>>>> claiming, which I think will make the current mixed behavior obsolete.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
>>>> wrote:
>>>>
>>>> Hi devs,
>>>>
>>>> I'd like to bring up for a discussion a proposal to clean up ownership
>>>> of snapshots, both checkpoints and savepoints.
>>>>
>>>> The goal here is to make it clear who is responsible for deleting
>>>> checkpoints/savepoints files and when can that be done in a safe manner.
>>>>
>>>> Looking forward for your feedback!
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Konstantin Knauf
>>>>
>>>> https://twitter.com/snntrable
>>>>
>>>> https://github.com/knaufk
>>>>


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Roman Khachatryan
sage I think).
> >>
> >> 6. Enforcing re-upload by a single task and skew
> >> If we use some greedy logic like subtask 0 always re-uploads then it
> >> might be overloaded.
> >> So we'll have to obtain a full list of subtasks first (then probably
> >> choose randomly or round-robin).
> >> However, that requires rebuilding Task snapshot, which is doable but
> >> not trivial (which I think supports "reverse API option").
> >>
> >> 7. I think it would be helpful to list file systems / object stores
> >> that support "fast" copy (ideally with latency numbers).
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  
> >> wrote:
> >>
> >> Hi,
> >>
> >> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> >> states. +1 for the overall changes since it makes the behavior clear and
> >> provide users a determined method to finally cleanup savepoints / retained 
> >> checkpoints.
> >>
> >> Regarding the changes to the public interface, it seems currently the 
> >> changes are all bound
> >> to the savepoint, but from the FLIP it seems perhaps we might also need to 
> >> support the claim declaration
> >> for retained checkpoints like in the cli side[1] ? If so, then might it be 
> >> better to change the option name
> >> from `execution.savepoint.restore-mode` to something like 
> >> `execution.restore-mode`?
> >>
> >> Best,
> >> Yun
> >>
> >>
> >> [1] 
> >> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
> >>
> >>
> >> --
> >> From:Konstantin Knauf 
> >> Send Time:2021 Nov. 19 (Fri.) 16:00
> >> To:dev 
> >> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
> >>
> >> Hi Dawid,
> >>
> >> Thanks for working on this FLIP. Clarifying the differences and
> >> guarantees around savepoints and checkpoints will make it easier and safer
> >> for users and downstream projects and platforms to work with them.
> >>
> >> +1 to the changing the current (undefined) behavior when recovering from
> >> retained checkpoints. Users can now choose between claiming and not
> >> claiming, which I think will make the current mixed behavior obsolete.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
> >> wrote:
> >>
> >> Hi devs,
> >>
> >> I'd like to bring up for a discussion a proposal to clean up ownership
> >> of snapshots, both checkpoints and savepoints.
> >>
> >> The goal here is to make it clear who is responsible for deleting
> >> checkpoints/savepoints files and when can that be done in a safe manner.
> >>
> >> Looking forward for your feedback!
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> [1] https://cwiki.apache.org/confluence/x/bIyqCw
> >>
> >>
> >>
> >> --
> >>
> >> Konstantin Knauf
> >>
> >> https://twitter.com/snntrable
> >>
> >> https://github.com/knaufk
> >>
>


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Dawid Wysakowicz
uldn't side effects be enforced in this case? (i.e. send
>> notifyCheckpointComplete)
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> Anyways, any external tool will have to poll Flink API waiting for the
>> next (full) checkpoint, before deleting the retained checkpoint,
>> right?
>> Instead, we can provide an API which tells whether the 1st checkpoint
>> is still in use (and not force re-upload it).
>>
>> Under the hood, it can work like this:
>> - for the checkpoint Flink recovers from, remember all shared state
>> handles it is adding
>> - when unregistering shared state handles, remove them from the set above
>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>
>> Besides not requiring re-upload, it seems much simpler and less invasive.
>> On the downside, state deletion can be delayed; but I think this is a
>> reasonable trade-off.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>> after a configured number of checkpoints?
>> There is a high chance that after some more checkpoints, initial state
>> will not be used (because of compaction),
>> so backends won't have to re-upload anything (or small part).
>>
>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>> This should be addressed in 
>> https://issues.apache.org/jira/browse/FLINK-24611.
>> If not, I think the FLIP should consider this case.
>>
>> 5. Enforcing re-upload by a single task and Changelog state backend
>> With Changelog state backend, a file can be shared by multiple operators.
>> Therefore, getIntersection() is irrelevant here, because operators
>> might not be sharing any key groups.
>> (so we'll have to analyze "raw" file usage I think).
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> 7. I think it would be helpful to list file systems / object stores
>> that support "fast" copy (ideally with latency numbers).
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  wrote:
>>
>> Hi,
>>
>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>> states. +1 for the overall changes since it makes the behavior clear and
>> provide users a determined method to finally cleanup savepoints / retained 
>> checkpoints.
>>
>> Regarding the changes to the public interface, it seems currently the 
>> changes are all bound
>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>> support the claim declaration
>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>> better to change the option name
>> from `execution.savepoint.restore-mode` to something like 
>> `execution.restore-mode`?
>>
>> Best,
>> Yun
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>
>>
>> --
>> From:Konstantin Knauf 
>> Send Time:2021 Nov. 19 (Fri.) 16:00
>> To:dev 
>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>
>> Hi Dawid,
>>
>> Thanks for working on this FLIP. Clarifying the differences and
>> guarantees around savepoints and checkpoints will make it easier and safer
>> for users and downstream projects and platforms to work with them.
>>
>> +1 to the changing the current (undefined) behavior when recovering from
>> retained checkpoints. Users can now choose between claiming and not
>> claiming, which I think will make the current mixed behavior obsolete.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
>> wrote:
>>
>> Hi devs,
>>
>> I'd like to bring up for a discussion a proposal to clean up ownership
>> of snapshots, both checkpoints and savepoints.
>>
>> The goal here is to make it clear who is responsible for deleting
>> checkpoints/savepoints files and when can that be done in a safe manner.
>>
>> Looking forward for your feedback!
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>



OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Dawid Wysakowicz
 into it.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for the proposal Dawid, I have some questions and remarks:
>>
>> 1. How will stop-with-savepoint be handled?
>> Shouldn't side effects be enforced in this case? (i.e. send
>> notifyCheckpointComplete)
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> Anyways, any external tool will have to poll Flink API waiting for the
>> next (full) checkpoint, before deleting the retained checkpoint,
>> right?
>> Instead, we can provide an API which tells whether the 1st checkpoint
>> is still in use (and not force re-upload it).
>>
>> Under the hood, it can work like this:
>> - for the checkpoint Flink recovers from, remember all shared state
>> handles it is adding
>> - when unregistering shared state handles, remove them from the set above
>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>
>> Besides not requiring re-upload, it seems much simpler and less invasive.
>> On the downside, state deletion can be delayed; but I think this is a
>> reasonable trade-off.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>> after a configured number of checkpoints?
>> There is a high chance that after some more checkpoints, initial state
>> will not be used (because of compaction),
>> so backends won't have to re-upload anything (or small part).
>>
>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>> This should be addressed in 
>> https://issues.apache.org/jira/browse/FLINK-24611.
>> If not, I think the FLIP should consider this case.
>>
>> 5. Enforcing re-upload by a single task and Changelog state backend
>> With Changelog state backend, a file can be shared by multiple operators.
>> Therefore, getIntersection() is irrelevant here, because operators
>> might not be sharing any key groups.
>> (so we'll have to analyze "raw" file usage I think).
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> 7. I think it would be helpful to list file systems / object stores
>> that support "fast" copy (ideally with latency numbers).
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  wrote:
>>
>> Hi,
>>
>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>> states. +1 for the overall changes since it makes the behavior clear and
>> provide users a determined method to finally cleanup savepoints / retained 
>> checkpoints.
>>
>> Regarding the changes to the public interface, it seems currently the 
>> changes are all bound
>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>> support the claim declaration
>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>> better to change the option name
>> from `execution.savepoint.restore-mode` to something like 
>> `execution.restore-mode`?
>>
>> Best,
>> Yun
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>
>>
>> --
>> From:Konstantin Knauf 
>> Send Time:2021 Nov. 19 (Fri.) 16:00
>> To:dev 
>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>
>> Hi Dawid,
>>
>> Thanks for working on this FLIP. Clarifying the differences and
>> guarantees around savepoints and checkpoints will make it easier and safer
>> for users and downstream projects and platforms to work with them.
>>
>> +1 to the changing the current (undefined) behavior when recovering from
>> retained checkpoints. Users can now choose between claiming and not
>> claiming, which I think will make the current mixed behavior obsolete.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
>> wrote:
>>
>> Hi devs,
>>
>> I'd like to bring up for a discussion a proposal to clean up ownership
>> of snapshots, both checkpoints and savepoints.
>>
>> The goal here is to make it clear who is responsible for deleting
>> checkpoints/savepoints files and when can that be done in a safe manner.
>>
>> Looking forward for your feedback!
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>



OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Roman Khachatryan
re checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> that support "fast" copy (ideally with latency numbers).
>
> Regards,
> Roman
>
> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained 
> checkpoints.
>
> Regarding the changes to the public interface, it seems currently the changes 
> are all bound
> to the savepoint, but from the FLIP it seems perhaps we might also need to 
> support the claim declaration
> for retained checkpoints like in the cli side[1] ? If so, then might it be 
> better to change the option name
> from `execution.savepoint.restore-mode` to something like 
> `execution.restore-mode`?
>
> Best,
> Yun
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
> --
> From:Konstantin Knauf 
> Send Time:2021 Nov. 19 (Fri.) 16:00
> To:dev 
> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>
> Hi Dawid,
>
> Thanks for working on this FLIP. Clarifying the differences and
> guarantees around savepoints and checkpoints will make it easier and safer
> for users and downstream projects and platforms to work with them.
>
> +1 to the changing the current (undefined) behavior when recovering from
> retained checkpoints. Users can now choose between claiming and not
> claiming, which I think will make the current mixed behavior obsolete.
>
> Cheers,
>
> Konstantin
>
> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
> wrote:
>
> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Dawid Wysakowicz
;> Regarding the changes to the public interface, it seems currently the 
>> changes are all bound
>> to the savepoint, but from the FLIP it seems perhaps we might also need to 
>> support the claim declaration
>> for retained checkpoints like in the cli side[1] ? If so, then might it be 
>> better to change the option name
>> from `execution.savepoint.restore-mode` to something like 
>> `execution.restore-mode`?
>>
>> Best,
>> Yun
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>
>>
>> --
>> From:Konstantin Knauf 
>> Send Time:2021 Nov. 19 (Fri.) 16:00
>> To:dev 
>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>
>> Hi Dawid,
>>
>> Thanks for working on this FLIP. Clarifying the differences and
>> guarantees around savepoints and checkpoints will make it easier and safer
>> for users and downstream projects and platforms to work with them.
>>
>> +1 to the changing the current (undefined) behavior when recovering from
>> retained checkpoints. Users can now choose between claiming and not
>> claiming, which I think will make the current mixed behavior obsolete.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi devs,
>>>
>>> I'd like to bring up for a discussion a proposal to clean up ownership
>>> of snapshots, both checkpoints and savepoints.
>>>
>>> The goal here is to make it clear who is responsible for deleting
>>> checkpoints/savepoints files and when can that be done in a safe manner.
>>>
>>> Looking forward for your feedback!
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>>
>>>
>>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Roman Khachatryan
Hi,

Thanks for the proposal Dawid, I have some questions and remarks:

1. How will stop-with-savepoint be handled?
Shouldn't side effects be enforced in this case? (i.e. send
notifyCheckpointComplete)

2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
Anyways, any external tool will have to poll Flink API waiting for the
next (full) checkpoint, before deleting the retained checkpoint,
right?
Instead, we can provide an API which tells whether the 1st checkpoint
is still in use (and not force re-upload it).

Under the hood, it can work like this:
- for the checkpoint Flink recovers from, remember all shared state
handles it is adding
- when unregistering shared state handles, remove them from the set above
- when the set becomes empty the 1st checkpoint can be deleted externally

Besides not requiring re-upload, it seems much simpler and less invasive.
On the downside, state deletion can be delayed; but I think this is a
reasonable trade-off.

3. Alternatively, re-upload not necessarily on 1st checkpoint, but
after a configured number of checkpoints?
There is a high chance that after some more checkpoints, initial state
will not be used (because of compaction),
so backends won't have to re-upload anything (or small part).

4. Re-uploaded artifacts must not be deleted on checkpoin abortion
This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
If not, I think the FLIP should consider this case.

5. Enforcing re-upload by a single task and Changelog state backend
With Changelog state backend, a file can be shared by multiple operators.
Therefore, getIntersection() is irrelevant here, because operators
might not be sharing any key groups.
(so we'll have to analyze "raw" file usage I think).

6. Enforcing re-upload by a single task and skew
If we use some greedy logic like subtask 0 always re-uploads then it
might be overloaded.
So we'll have to obtain a full list of subtasks first (then probably
choose randomly or round-robin).
However, that requires rebuilding Task snapshot, which is doable but
not trivial (which I think supports "reverse API option").

7. I think it would be helpful to list file systems / object stores
that support "fast" copy (ideally with latency numbers).

Regards,
Roman

On Mon, Nov 22, 2021 at 9:24 AM Yun Gao  wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained 
> checkpoints.
>
> Regarding the changes to the public interface, it seems currently the changes 
> are all bound
> to the savepoint, but from the FLIP it seems perhaps we might also need to 
> support the claim declaration
> for retained checkpoints like in the cli side[1] ? If so, then might it be 
> better to change the option name
> from `execution.savepoint.restore-mode` to something like 
> `execution.restore-mode`?
>
> Best,
> Yun
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
> --------------
> From:Konstantin Knauf 
> Send Time:2021 Nov. 19 (Fri.) 16:00
> To:dev 
> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>
> Hi Dawid,
>
> Thanks for working on this FLIP. Clarifying the differences and
> guarantees around savepoints and checkpoints will make it easier and safer
> for users and downstream projects and platforms to work with them.
>
> +1 to the changing the current (undefined) behavior when recovering from
> retained checkpoints. Users can now choose between claiming and not
> claiming, which I think will make the current mixed behavior obsolete.
>
> Cheers,
>
> Konstantin
>
> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
> wrote:
>
> > Hi devs,
> >
> > I'd like to bring up for a discussion a proposal to clean up ownership
> > of snapshots, both checkpoints and savepoints.
> >
> > The goal here is to make it clear who is responsible for deleting
> > checkpoints/savepoints files and when can that be done in a safe manner.
> >
> > Looking forward for your feedback!
> >
> > Best,
> >
> > Dawid
> >
> > [1] https://cwiki.apache.org/confluence/x/bIyqCw
> >
> >
> >
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-22 Thread Yun Gao
Hi,

Very thanks Dawid for proposing the FLIP to clarify the ownership for the 
states. +1 for the overall changes since it makes the behavior clear and 
provide users a determined method to finally cleanup savepoints / retained 
checkpoints.

Regarding the changes to the public interface, it seems currently the changes 
are all bound
to the savepoint, but from the FLIP it seems perhaps we might also need to 
support the claim declaration
for retained checkpoints like in the cli side[1] ? If so, then might it be 
better to change the option name
from `execution.savepoint.restore-mode` to something like 
`execution.restore-mode`? 

Best,
Yun


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint


--
From:Konstantin Knauf 
Send Time:2021 Nov. 19 (Fri.) 16:00
To:dev 
Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership

Hi Dawid,

Thanks for working on this FLIP. Clarifying the differences and
guarantees around savepoints and checkpoints will make it easier and safer
for users and downstream projects and platforms to work with them.

+1 to the changing the current (undefined) behavior when recovering from
retained checkpoints. Users can now choose between claiming and not
claiming, which I think will make the current mixed behavior obsolete.

Cheers,

Konstantin

On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
wrote:

> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk



Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-18 Thread Konstantin Knauf
Hi Dawid,

Thanks for working on this FLIP. Clarifying the differences and
guarantees around savepoints and checkpoints will make it easier and safer
for users and downstream projects and platforms to work with them.

+1 to the changing the current (undefined) behavior when recovering from
retained checkpoints. Users can now choose between claiming and not
claiming, which I think will make the current mixed behavior obsolete.

Cheers,

Konstantin

On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz 
wrote:

> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


[DISCUSS] FLIP-193: Snapshots ownership

2021-11-18 Thread Dawid Wysakowicz
Hi devs,

I'd like to bring up for a discussion a proposal to clean up ownership
of snapshots, both checkpoints and savepoints.

The goal here is to make it clear who is responsible for deleting
checkpoints/savepoints files and when can that be done in a safe manner.

Looking forward for your feedback!

Best,

Dawid

[1] https://cwiki.apache.org/confluence/x/bIyqCw




OpenPGP_signature
Description: OpenPGP digital signature