[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5820.
---------------------------------
    Resolution: Fixed

Fixed by resolution of all subtasks

> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
>                 Key: FLINK-5820
>                 URL: https://issues.apache.org/jira/browse/FLINK-5820
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
>                               /shared/    <-- shared checkpoint data
>                               /chk-1/...  <-- data exclusive to checkpoint 1
>                               /chk-2/...  <-- data exclusive to checkpoint 2
>                               /chk-3/...  <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
>                               /shared/...
>                               /chk-1/...
>                               /chk-2/...
>                               /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
>                      /savepoint-2/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to