[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-12-23 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16728133#comment-16728133
 ] 

Yun Tang commented on FLINK-10930:
--

I plan to close this issue, and we should move to 
[FLINK-10724|https://issues.apache.org/jira/browse/FLINK-10724] for further 
discussion about refactoring checkpoint coordinator. Moreover, I think the 
discussion above in this issue could offer some inspiration for the refactor.

Anyone could feel free to reopen this issue if any new ideas provided.

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-12-10 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714977#comment-16714977
 ] 

Andrey Zagrebin commented on FLINK-10930:
-

I agree that JM CheckpointCoordinator should be the central point of checkpoint 
failure handling and decision making.

We introduced task local failure flag in FLINK-4809 as part of FLINK-4808.

In the latest attempt to continue on FLINK-4808, we concluded to move failure 
handling from TM to JM in PR for FLINK-10074 (eventually duplicate of  
FLINK-4808/FLINK-4810).

I think once we have more unified failure handling in CheckpointCoordinator 
(FLINK-10724), we can efficiently decide when to fail the job. 

This could also help to investigate what can be done proactively to cleanup any 
possible left-overs from task.

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-12-10 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714844#comment-16714844
 ] 

Andrey Zagrebin commented on FLINK-10930:
-

One more example of job failure due to checkpoint failure in single task 
because of checkpoint disposal on JM side

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-12-09 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16713903#comment-16713903
 ] 

Yun Tang commented on FLINK-10930:
--

[~StephanEwen] The ideas of cleaning up exclusive and shared state look very 
very similar with our solution, in which we already encoded checkpoint-id on 
state files.
 * For cleaning up the exclusive state, to delete more eagerly, we remove the 
ones, which older than the *latest* completed checkpoint and also not contained 
in current retained checkpoints.
 * For cleaning up shared state, we met more complex situation due to we 
introduce a mechanism which writes the state into files with the same name 
across different checkpoints (to reduce the number of files created when each 
checkpoint created and the files each old checkpoint subsumed), the encoded 
checkpoint-id means the first checkpoint id when this file created. Although a 
bit more complex, the rule to clean up shared state just behaves exactly the 
same as yours: deletes all files that are neither referenced by a shared state 
handle and are older than the latest completed checkpoint. To cleanup more 
eagerly, we also have option to let the procedure, list and find files could be 
deleted, happens before any tasks running. Last but not least, "lists all files 
in the shared state directory" needs to introduce new {{listStatusIterator}} 
API, current {{listStatus}} API might cause JM to crash and expensive for DFS.

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since 

[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-12-06 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712017#comment-16712017
 ] 

Stephan Ewen commented on FLINK-10930:
--

[~yunta] Better exception handling is definitely always a really good idea. I 
would suggest to proceed as follows:

  - We work towards a model where checkpoint failures never cause task 
failures, but only "decline messages". As fas as I know, [~azagrebin] mentioned 
that there is some discussion already. Could you chime in here, Andrey?

  - We fix all cases of state being left in exception cases that we find.

  - We introduce an exclusive state cleanup daemon (see below)

  - We introduce a shared state cleanup daemon (see below)

h3. Cleaning up exclusive state (idea)

  - when the daemon starts a cleanup sweep it determines what the oldest still 
retained checkpoint number is
  - it enumerates all directories "CHK-XXX" and recursively deletes the ones 
older than the oldest retained checkpoint


h3. Cleaning up shared state (idea)

  - every shared state file needs to encode the checkpoint-id of the checkpoint 
during which it was created
  - when the daemon does a cleanup swipe, it atomically grabs the number of the 
latest completed checkpoint, and a snapshot of all shared state handles in the 
shared state registry
  - the daemon then lists all files in the shared state directory and deletes 
all files that are neither referenced by a shared state handle and are older 
than the latest completed checkpoint


What do you think about that?


> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout 

[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-12-04 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708978#comment-16708978
 ] 

Yun Tang commented on FLINK-10930:
--

Speaking of eager and proper discarding useless state, this is really a pain 
point for us since we could have 200TB+ state in our real jobs, which means 
even 1% useless state left could result in 2TB logical space occupied in our 
DFS. I really agreed that cleaning exclusive data is much easier with current 
checkpoint directory layout. However, I think current layout has no help with 
cleaning shared state, which occupying most of space in each checkpoint, but 
have to list files under shared state's directory. We had implemented similar 
cleanup mechanism by introducing new {{listStatusIterator}} API to reduce 
pressure to DFS and memory footprint in JM, looking up in refactored snapshot 
registry and then discard useless state handles asynchronously.

Currently, I prefer to close this issue after above discussion with 
[~StephanEwen]. We should focus on how to handle checkpoint exception better 
and fails on JM's side when some conditions achieved.

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-30 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704844#comment-16704844
 ] 

Stephan Ewen commented on FLINK-10930:
--

Concerning the cleanup of lingering "chk-xxx" directories: A while ago we were 
thinking to implement a periodic cleanup daemon. Something what would check all 
x minutes what retained checkpoints still exist, and delete all the exclusive 
directories before that.

Such a cleanup daemon would have the advantage that it captures all cases where 
data is left, including those where a TM just failed before sending the message 
to the JM and hence no state handle to that state even exists anywhere (nothing 
where we could call "delete and parent delete"). Naturally, that should not 
mean we purely rely on it, but eager and proper state release is still 
desirable.

It is also possible to build a cleanup daemon for shared state, but it is a bit 
more tricky.


> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-29 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704286#comment-16704286
 ] 

Yun Tang commented on FLINK-10930:
--

[~StephanEwen] I think your opinion to refactor the mechanism of 
{{CheckpointExceptionHandler}} sounds more reasonable, we should always decline 
checkpoint on task side instead of just failing this task. Maybe we should 
retake FLINK-4810 to enable checkpoint coordinator fail after "n" unsuccessful 
checkpoints.

There exists some details on this problem:
 # Whether we should also fail the whole execution graph after "n" *expired* 
checkpoints.
 # If checkpoint coordinator receive late expire checkpoint message, it should 
just delete file state handle's parent folder (that is the {{chk-i}} folder) to 
avoid those untouched  {{chk-i}} folders left.

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-28 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701774#comment-16701774
 ] 

Stephan Ewen commented on FLINK-10930:
--

Sorry, the first part was a typo on my side. It should have meant that "the 
checkpoint cannot complete anyways, because it is already timed out". So the 
failure is not a problem per se.

I think changing the layout is fixing the problem at the wrong end, it is 
trying to mitigate the symptom rather than fixing the cause.
My feeling is that the right approach is to better handle what failures do to 
checkpoints:

  - A failure when materializing the checkpoint results in a "checkpoint 
declined message", but should never trigger a task failure on the taskmanager 
directly.

  - The checkpoint coordinator receives the decline message. If the checkpoint 
is already disposed (timeout or another failure), then no problem, simply 
ignore the exception (clean up state chunk).

  - If the checkpoint fails because of the decline message, the coordinator can 
decide whether this should cause a failure of the task or not (this also 
enables strategies like "fail only of n consecutive checkpoints failed").



> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-27 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701359#comment-16701359
 ] 

Yun Tang commented on FLINK-10930:
--

[~StephanEwen] I try to answer your points of view:
 * _The error is not a real problem, because the checkpoint would complete 
anyways. You can set Flink to no trigger recovery when checkpoint 
materialization fails, but simply continue._

Actually, the exception would happen usually after the checkpoint expired due 
to timeout, which means the checkpoint would not complete anyways. We can call 
CheckpointConfig#setFailOnCheckpointingErrors(false) to avoid the job failover. 
However, since we cannot guarantee any checkpoint not expire, the feature of 
operator to fail due to checkpoint fails would be valueless.
 * _We can explicitly catch that exception and decline the checkpoint in that 
case_

The exception is just an IOException of "Could not flush and close the file 
system output stream", which is not easy to explicitly catch. Otherwise, if we 
always decline checkpoint when met IOException, this would impact the feature 
of operator to fail due to checkpoint fails.
 * _We can make sure that file creation ensures existence of parent directories_

Actually, when we create the file, the parent directory still exists, but 
deleted after we dispose the expired checkpoint. This exception happens when we 
try to flush data to target stream.

 

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> 

[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-27 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700528#comment-16700528
 ] 

Stephan Ewen commented on FLINK-10930:
--

The layout refactoring is not the right solution to this problem in my opinion.

  - The error is not a real problem, because the checkpoint would complete 
anyways. You can set Flink to no trigger recovery when checkpoint 
materialization fails, but simply continue.

  - We can explicitly catch that exception and decline the checkpoint in that 
case

  - We can make sure that file creation ensures existence of parent directories


> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-25 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698560#comment-16698560
 ] 

Yun Tang commented on FLINK-10930:
--

[~StephanEwen] I know this suggested changes would impact your original 
motivation. However, once you would delete a directory in DFS, which could 
still be used to store data for other tasks in distributed environment, you 
cannot guarantee this exception not happening. I think it's quite easy to 
reproduce this phenomenon by just setting checkpoint timeout very short. 

Below is the exception I met, if we take a look at the hdfs's audit log, this 
exception happens when {{task create state --> JM delete chk directory --> task 
fails to write data to stream}}.

{code:java}
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 14 
for operator Flat Map -> Flat Map (1/3).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:978)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
at java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.Exception: Could not materialize checkpoint 14 for 
operator Flat Map -> Flat Map (1/3).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:973)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs:/xxx/a105a6971970401f25714efa67000518/chk-14/1fdcf28d-aaba-4591-85c4-cdab804207ba
 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:61)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs:/xxx/a105a6971970401f25714efa67000518/chk-14/1fdcf28d-aaba-4591-85c4-cdab804207ba
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:341)
at 
org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream.closeAndGetPrimaryHandle(DuplicatingCheckpointOutputStream.java:268)
at 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryAndSecondaryStream.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:115)
at 
org.apache.flink.contrib.streaming.state.RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBIncrementalSnapshotOperation.java:383)
at 
org.apache.flink.contrib.streaming.state.RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBIncrementalSnapshotOperation.java:146)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:59)
... 6 more
Caused by: java.io.FileNotFoundException: File does not exist: 
/xxx/a105a6971970401f25714efa67000518/chk-14/1fdcf28d-aaba-4591-85c4-cdab804207ba
 (inode 60074501) [Lease.  Holder: DFSClient_NONMAPREDUCE_-103338934_91, 
pending creates: 1]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2623)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2503)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:828)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:512)
at 

[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-25 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698333#comment-16698333
 ] 

Stephan Ewen commented on FLINK-10930:
--

The suggested changes would undo quite a lot of what motivated the original 
change, and there may also be some confusion in the problem motivation.

The "chk-x" directories are explicitly exclusive. Users and cleanup daemons 
should be able to drop these directories when they are old enough, deleting all 
data that is exclusive to that checkpoint. This also makes sense for 
externalized checkpoints.

The issue of taskmanagers not being able to write state files when the 
JobManager deleted the parent directory "chk-x" can be addressed differently. 
(can you describe where this happens? Most FileSystems always re-create parents 
and should never fail with such an exception).


> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)