Stephan Ewen created FLINK-5820:
-----------------------------------

             Summary: Extend State Backend Abstraction to support Global 
Cleanup Hooks
                 Key: FLINK-5820
                 URL: https://issues.apache.org/jira/browse/FLINK-5820
             Project: Flink
          Issue Type: Improvement
          Components: State Backends, Checkpointing
    Affects Versions: 1.2.0
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.3.0


The current state backend abstraction has the limitation that each piece of 
state is only meaningful in the context of its state handle. There is no 
possibility of a view onto "all state associated with checkpoint X".

That causes several issues
  - State might not be cleaned up in the process of failures. When a 
TaskManager hands over a state handle to the JobManager and either of them has 
a failure, the state handle may be lost and state lingers.
  - State might also linger if a cleanup operation failed temporarily, and the 
checkpoint metadata was already disposed
  - State cleanup is more expensive than necessary in many cases. Each state 
handle is individually released. For large jobs, this means 1000s of release 
operations (typically file deletes) per checkpoint, which can be expensive on 
some file systems.
  - It is hard to guarantee cleanup of parent directories with the current 
architecture.

The core changes proposed here are:

  1. Each job has one core {{StateBackend}}. In the future, operators may have 
different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix and match 
for example RocksDB storabe and in-memory storage.

  2. The JobManager needs to be aware of the {{StateBackend}}.

  3. Storing checkpoint metadata becomes responsibility of the state backend, 
not the "completed checkpoint store". The later only stores the pointers to the 
available latest checkpoints (either in process or in ZooKeeper).

  4. The StateBackend may optionally have a hook to drop all checkpointed state 
that belongs to only one specific checkpoint (shared state comes as part of 
incremental checkpointing).

  5.  The StateBackend needs to have a hook to drop all checkpointed state up 
to a specific checkpoint (for all previously discarded checkpoints).

  6. In the future, this must support periodic cleanup hooks that track 
orphaned shared state from incremental checkpoints.

For the {{FsStateBackend}}, which stores most of the checkpointes state 
currently (transitively for RocksDB as well), this means a re-structuring of 
the storage directories as follows:

{code}
../<flink-checkpoints>/job1-id/
                              /shared/    <-- shared checkpoint data
                              /chk-1/...  <-- data exclusive to checkpoint 1
                              /chk-2/...  <-- data exclusive to checkpoint 2
                              /chk-3/...  <-- data exclusive to checkpoint 3

../<flink-checkpoints>/job2-id/
                              /shared/...
                              /chk-1/...
                              /chk-2/...
                              /chk-3/...

../<flink-savepoints>/savepoint-1/savepoint-root
                                 /file-1-uid
                                 /file-2-uid
                                 /file-3-uid
                     /savepoint-2/savepoint-root
                                 /file-1-uid
                                 /file-2-uid
                                 /file-3-uid
{code}


This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to