[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217724#comment-17217724 ] Roman Khachatryan commented on FLINK-19596: --- Thanks for the clarification [~wind_ljy] FYI, I've published a [PR|https://github.com/apache/flink/pull/13709] for FLINK-19401. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Critical > Fix For: 1.12.0 > > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216590#comment-17216590 ] Jiayi Liao commented on FLINK-19596: [~roman_khachatryan] In the losing-regaining leadership case, currently the {{CompletedCheckpointStore}} will be re-constructed because the {{CompletedCheckpointStore}} and {{ExecutionGraph}} share the same lifecycle for now (which should be decoupled from my perspective). I think your solution is better, because we don't need to consider the {{CompletedCheckpointStore}}'s lifecycle in this way and it can also work well in the future when {{CompletedCheckpointStore}} is decoupled from {{ExecutionGraph}} in the future (maybe this case will happen). > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Critical > Fix For: 1.12.0 > > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216581#comment-17216581 ] Roman Khachatryan commented on FLINK-19596: --- Hi, I was thinking about the solution for a related issue FLINK-19401. What I came up with is skipping loading checkpoints from FS *after listing them in ZK* inside ZooKeeperCompletedCheckpointStore.recover() - if they are already loaded. It seems [~wind_ljy] that your proposal is similar, but it also skips the listing of checkpoints in ZK. Is that correct? If so, wouldn't it allow restoring from a not the latest checkpoint? (e.g. in case of losing-regaining leadership)? > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Critical > Fix For: 1.12.0 > > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216563#comment-17216563 ] Till Rohrmann commented on FLINK-19596: --- Thanks for creating this issue [~wind_ljy]. I think you are right that we should only recover the checkpoints once during a single leader session. Ideally, this should happen when the {{JobMaster}} is started. In order to do it properly, we would have to move the {{CompletedCheckpointStore}} from the {{ExecutionGraph}} to the {{JobMaster}} level. If this is too involved, then using a lazy loading strategy sounds good to me. [~roman_khachatryan] proposed in a private thread to check whether the set of registered checkpoint ids has changed and only if this is true to load the checkpoints again. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216541#comment-17216541 ] Jiayi Liao commented on FLINK-19596: [~yunta] I didn't know there's already a JIRA before. Thanks for the remind and you can re-assign it to me. I agree with [~zjwang] that it may not be a good solution to couple {{CompletedCheckpointStore.recover()}} with the types of failover. The best solution comes to my mind for now is the same as [~zjwang]'s solution, letting {{CompletedCheckpointStore}} recover itself if it's not been recovered yet, when {{CheckpointCoordinator}} or other components trying to access the states in {{CompletedCheckpointStore}}. Specifically, I'm going to add new interface {{boolean isCheckpointStoreInitialized()}} in {{CompletedCheckpointStore}} and allow different implementations based on its root component. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216530#comment-17216530 ] Yun Tang commented on FLINK-19596: -- [~wind_ljy] this issue duplicates with FLINK-6984 and I think this deserve a fix although not so high priority. I took the ticket of FLINK-6984 previously and not submit any valid PR yet. I could re-assign that ticket to you if you have interest and time. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216520#comment-17216520 ] Zhijiang commented on FLINK-19596: -- I heard of this issue proposed long time ago from [~tangyun], maybe he knew some other backgrounds. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216517#comment-17216517 ] Zhijiang commented on FLINK-19596: -- Thanks for proposing this potential improvement [~wind_ljy]! +1 for the motivation. If I understood correctly, the `ZooKeeperCompletedCheckpointStore#completedCheckpoints` is not loaded (lazy loading) during class construction, and it would be updated in append way during follow up `CheckpointCoordinator#completePendingCheckpoint`. Once `CheckpointCoordinator#restoreLatestCheckpointedStateInternal, it lazy loads all the completed checkpoints via `CompletedCheckpointStore#recover`. So it has some duplicate overhead here to read already completed checkpoint twice. Regarding the solution, I am not quite clear whether we can couple this decision(CompletedCheckpointStore#recover) with global/local/regional failover. E.g. JM leader change will cause global failover now, but if we improve it future, it might not need job restart via reconciling. If we can refactor to make `completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery)` cover the internal logic of `completedCheckpointStore.recover()`, it might seem more make sense. I mean we only care about to get latest checkpoint via `#getLatestCheckpoint` interface, and in its internal implementations it can judge whether further need to call `#recover` to load previously completed checkpoint or not. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17215808#comment-17215808 ] Nicholas Jiang commented on FLINK-19596: [~wind_ljy], I agree with the point you mentioned. CheckpointCoordinator could restore latest checkpointed state in the cases including local or regional failover, global restore style operation and restoring the state with the savepoint. CheckpointCoordinator doesn't need to recover the checkpoints on each recovery of local or regional failover because this failover lack of the leadership. We could add the isGlobalRecovery check for completedCheckpointStore.recover caller. cc [~trohrmann] > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover
[ https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17212851#comment-17212851 ] Jiayi Liao commented on FLINK-19596: Currently {{restoreLatestCheckpointedStateInternal}} would be invoked in three cases below: 1. Non-Gloabal recovery. 2. Global recovery. 3. Savepoint restore. Since it's hard to identify the reason of Global Recovery(JM leadership changed or a real global failover), we might be able to skip the recovery of {{completedCheckpointStore}} on Non-Global recovery. > Do not recover CompletedCheckpointStore on each failover > > > Key: FLINK-19596 > URL: https://issues.apache.org/jira/browse/FLINK-19596 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.2 >Reporter: Jiayi Liao >Priority: Major > > {{completedCheckpointStore.recover()}} in > {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover > because the {{CompletedCheckpointStore}} needs to load HDFS files to > instantialize the {{CompleteCheckpoint}} instances. > The impact is significant in our case below: > * Jobs with high parallelism (no shuffle) which transfer data from Kafka to > other filesystems. > * If a machine goes down, several containers and tens of tasks are affected, > which means the {{completedCheckpointStore.recover()}} would be called tens > of times since the tasks are not in a failover region. > And I notice there is a "TODO" in the source codes: > {code:java} > // Recover the checkpoints, TODO this could be done only when there is a new > leader, not on each recovery > completedCheckpointStore.recover(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)