[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-04-22 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3524
  
Merged. @shixiaogang can you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-04-22 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3524
  
Merging this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-04-07 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen I have updated the PR, making the following changes:
1. Add a method called `discardSharedStatesOnFail()` in 
`CompositeStateHandle`.  This method is called when the pending checkpoint 
fails to complete. That way, we do not need to register shared states once an 
acknowledge message is received.  All shared states are registered only when 
the pending checkpoint completes.
2. Add `SharedStateHandle` and refactor `SharedStateRegistry` as suggested. 
3. Both registration and unregistration of shared states now are taken 
place in `CompletedCheckpoint`.

What do you think of these changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-29 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen  Thanks very much for your valuable comments. The following 
are some of my thoughts.

* Now the registration of shared states is put in `CheckpointCoordinator` 
because it's needed whenever a `PendingCheckpoint` receives a state handle or a 
`CompletedCheckpoint` is recovered. But I think it does make sense to put both 
the registration and unregistration of shared states in the same place. I will 
update the PR so that the logics are put in `PendingCheckpoint`s and 
`CompletedCheckpoint`s.

* When a `SubtaskState` is not successfully added to the 
`PendingCheckpoint`, the state objects in the `SubtaskState` should be 
correctly deleted. The discarding of these `SubtaskState`s varies in different 
cases. In the case where the `PendingCheckpoint` fails, the `SubtaskState` 
should delete both its private states and shared states. But in the case where 
the `CompletedCheckpoint` is subsumed, the `SubtaskState` should delete those 
unreferenced shared states (possibly created by others) instead of its shared 
states.   

  By registering the shared states first, we can unify the implementation 
in the two cases. Those shared states in the failed `PendingCheckpoint` are 
always not referenced by other checkpoints. So they can be correctly discarded 
by the registry when the `PendingCheckpoint` unregisters its shared states, 
just like a subsumed `CompletedCheckpoint` does.

  Another choice is refactoring the interface of `CompositeStateHandle`. 
Three methods, namely `onComplete()`, `onFail()` and `onSubsume()`, will be 
provided. A`CompositeStateHandle` can implement these methods to correctly deal 
with its states under these cases.  What do you think?

* It's a good idea to introduce `SharedStateHandle` for shared states. It 
can improve the performance and allow safety checks. I will add it in the 
update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-24 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen I have updated the PR as suggested. Changes include
1. Make `StateRegistry` to be `SharedStateRegistry` where only shared 
states are registered. Now the `discardState()` method is supposed to delete 
those private states in the checkpoint.
2. `SharedStateRegistry` now is deployed by the `CheckpointCoordinator`. 
The state handle will register its shared states once it is received by the 
coordinator. In another words, all shared states in a completed checkpoint are 
registered when the checkpoint is added into the `CompletedCheckpointStore`.  
All checkpoints (including savepoints) will unregister shared states when they 
are removed from the store. Savepoints should not contain any shared state. 
Therefore the unregistration will not discard any state in the savepoints.
3. When recovering from failures or restarting from a savepoint, the 
`CheckpointCoordinator` will rebuild the registry with the checkpoints 
recovered in the `CompletedCheckpointStore`.
4. Related tests are added to ensure correctness.
5. The conflicts with the master branch are resolved.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-20 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
Hi @StephanEwen The main reason is that we must have methods to delete 
those unshared objects in failed `PendingCheckpoint`s.  The `discardState()` 
method is called when either the `PendingCheckpoint` fails or the 
`CompletedCheckpoint` is subsumed. Under current settings, the `discardState()` 
is supposed to delete only those unshared objects, and the shared objects are 
deleted by the `StateRegistry`.  Hence, we must register those state handles 
once they are received  so that their shared objects can be correctly deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
Can you give me some background why you want to make also 
`PendingCheckpoint` register its state immediately (and not only upon 
completion)?
I see no problem with that, just want to double check whether we are 
changing the assumption from the original design doc, where you suggested that 
shared state can only be referenced by another checkpoint, if it is already 
part of a committed checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
@shixiaogang Thanks for the fast response!

Can y In the initial design document, you suggest that shared state is only 
s


I think what we need is a subclass of `StateHandle` that is a 
`SharedStateHandle`. I would suggest that the shared state handle has a method 
`String getKey()` (or `Object getKey()`) which gives the unique identifier of 
the shared state. The `SharedStateRegistry` internally could use something like 
a `Map`. I think that would give us a bit 
more flexibility in how we describe "equality of shared state": we don't need 
to make sure that the state handles themselves implement `equals()` such that 
it meets the semantics of the shared state registry. In the case we have 
currently, the `getKey()` method could return the normalized path of the file.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-19 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3524
  
@StephanEwen Thanks a lot for your valuable comments. I will update the PR 
as suggested.
* I think it's a good idea that we make the `StateRegistry` into 
`SharedStateRegistry`.  That will need (1) the state handle must not discard 
registered objects in the `discardState` method and (2) the state handle has to 
register shared object once it is received by the coordinator (now the state 
handle does not register their objects before its checkpoint completes).
* Now that state handles have to register their objects once they are 
received by the coordinator, we should move `SharedStateRegister` from 
`CompletedCheckpointStore` to `CheckpointCoordinator`.
* It's better for `StateRegistry` directly discard an object once its 
reference count is 0. I used a list to collect discarded objects because I want 
to make the changes in the discarding of completed checkpoint as few as 
possible. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
One more question: Can the StateRegistry not directly drop states that have 
no reference any more when states are unregistered? Is there a special reason 
for first collecting these states in a list, then getting them and then 
dropping them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
Thanks for opening this pull request. Adding a `CompositeStateHandle` and a 
`StateRegistry` is a good idea.

Some thoughts:

  - What do you think about making the `StateRegistry` into a 
`SharedStateRegistry` which only contains the handles to state that is shared 
across checkpoints? State that is exclusive to a checkpoint is not handled by 
that registry, but remains only in the checkpoint. That way we "isolate" the 
existing behavior against the coming changes and do not risk regressions in the 
state cleanup code (which is very critical for current users).

  - Another reason for the above suggestion is to also bring some other 
code into place that has some "fast paths" and "safety nets" for checkpoint 
cleanups (currently only with non-shared state), for example dropping a 
checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 
). We have seen that for various users the state cleanup problems are among the 
biggest problems they have, which we can address very well with the work 
started in the above linked pull request. These things would work together 
seamlessly if the registry deals only with shared state handles.

  - I am wondering if it is easier to put the registry into the checkpoint 
coordinator rather than the checkpoint stores. That way we need the code that 
deals with adding / failure handling / etc only once.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---