[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3524 Merged. @shixiaogang can you please close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3524 Merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen I have updated the PR, making the following changes: 1. Add a method called `discardSharedStatesOnFail()` in `CompositeStateHandle`. This method is called when the pending checkpoint fails to complete. That way, we do not need to register shared states once an acknowledge message is received. All shared states are registered only when the pending checkpoint completes. 2. Add `SharedStateHandle` and refactor `SharedStateRegistry` as suggested. 3. Both registration and unregistration of shared states now are taken place in `CompletedCheckpoint`. What do you think of these changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen Thanks very much for your valuable comments. The following are some of my thoughts. * Now the registration of shared states is put in `CheckpointCoordinator` because it's needed whenever a `PendingCheckpoint` receives a state handle or a `CompletedCheckpoint` is recovered. But I think it does make sense to put both the registration and unregistration of shared states in the same place. I will update the PR so that the logics are put in `PendingCheckpoint`s and `CompletedCheckpoint`s. * When a `SubtaskState` is not successfully added to the `PendingCheckpoint`, the state objects in the `SubtaskState` should be correctly deleted. The discarding of these `SubtaskState`s varies in different cases. In the case where the `PendingCheckpoint` fails, the `SubtaskState` should delete both its private states and shared states. But in the case where the `CompletedCheckpoint` is subsumed, the `SubtaskState` should delete those unreferenced shared states (possibly created by others) instead of its shared states. By registering the shared states first, we can unify the implementation in the two cases. Those shared states in the failed `PendingCheckpoint` are always not referenced by other checkpoints. So they can be correctly discarded by the registry when the `PendingCheckpoint` unregisters its shared states, just like a subsumed `CompletedCheckpoint` does. Another choice is refactoring the interface of `CompositeStateHandle`. Three methods, namely `onComplete()`, `onFail()` and `onSubsume()`, will be provided. A`CompositeStateHandle` can implement these methods to correctly deal with its states under these cases. What do you think? * It's a good idea to introduce `SharedStateHandle` for shared states. It can improve the performance and allow safety checks. I will add it in the update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen I have updated the PR as suggested. Changes include 1. Make `StateRegistry` to be `SharedStateRegistry` where only shared states are registered. Now the `discardState()` method is supposed to delete those private states in the checkpoint. 2. `SharedStateRegistry` now is deployed by the `CheckpointCoordinator`. The state handle will register its shared states once it is received by the coordinator. In another words, all shared states in a completed checkpoint are registered when the checkpoint is added into the `CompletedCheckpointStore`. All checkpoints (including savepoints) will unregister shared states when they are removed from the store. Savepoints should not contain any shared state. Therefore the unregistration will not discard any state in the savepoints. 3. When recovering from failures or restarting from a savepoint, the `CheckpointCoordinator` will rebuild the registry with the checkpoints recovered in the `CompletedCheckpointStore`. 4. Related tests are added to ensure correctness. 5. The conflicts with the master branch are resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 Hi @StephanEwen The main reason is that we must have methods to delete those unshared objects in failed `PendingCheckpoint`s. The `discardState()` method is called when either the `PendingCheckpoint` fails or the `CompletedCheckpoint` is subsumed. Under current settings, the `discardState()` is supposed to delete only those unshared objects, and the shared objects are deleted by the `StateRegistry`. Hence, we must register those state handles once they are received so that their shared objects can be correctly deleted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 Can you give me some background why you want to make also `PendingCheckpoint` register its state immediately (and not only upon completion)? I see no problem with that, just want to double check whether we are changing the assumption from the original design doc, where you suggested that shared state can only be referenced by another checkpoint, if it is already part of a committed checkpoint. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 @shixiaogang Thanks for the fast response! Can y In the initial design document, you suggest that shared state is only s I think what we need is a subclass of `StateHandle` that is a `SharedStateHandle`. I would suggest that the shared state handle has a method `String getKey()` (or `Object getKey()`) which gives the unique identifier of the shared state. The `SharedStateRegistry` internally could use something like a `Map`. I think that would give us a bit more flexibility in how we describe "equality of shared state": we don't need to make sure that the state handles themselves implement `equals()` such that it meets the semantics of the shared state registry. In the case we have currently, the `getKey()` method could return the normalized path of the file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3524 @StephanEwen Thanks a lot for your valuable comments. I will update the PR as suggested. * I think it's a good idea that we make the `StateRegistry` into `SharedStateRegistry`. That will need (1) the state handle must not discard registered objects in the `discardState` method and (2) the state handle has to register shared object once it is received by the coordinator (now the state handle does not register their objects before its checkpoint completes). * Now that state handles have to register their objects once they are received by the coordinator, we should move `SharedStateRegister` from `CompletedCheckpointStore` to `CheckpointCoordinator`. * It's better for `StateRegistry` directly discard an object once its reference count is 0. I used a list to collect discarded objects because I want to make the changes in the discarding of completed checkpoint as few as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 One more question: Can the StateRegistry not directly drop states that have no reference any more when states are unregistered? Is there a special reason for first collecting these states in a list, then getting them and then dropping them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 Thanks for opening this pull request. Adding a `CompositeStateHandle` and a `StateRegistry` is a good idea. Some thoughts: - What do you think about making the `StateRegistry` into a `SharedStateRegistry` which only contains the handles to state that is shared across checkpoints? State that is exclusive to a checkpoint is not handled by that registry, but remains only in the checkpoint. That way we "isolate" the existing behavior against the coming changes and do not risk regressions in the state cleanup code (which is very critical for current users). - Another reason for the above suggestion is to also bring some other code into place that has some "fast paths" and "safety nets" for checkpoint cleanups (currently only with non-shared state), for example dropping a checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 ). We have seen that for various users the state cleanup problems are among the biggest problems they have, which we can address very well with the work started in the above linked pull request. These things would work together seamlessly if the registry deals only with shared state handles. - I am wondering if it is easier to put the registry into the checkpoint coordinator rather than the checkpoint stores. That way we need the code that deals with adding / failure handling / etc only once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---