[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-20 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217724#comment-17217724
 ] 

Roman Khachatryan commented on FLINK-19596:
---

Thanks for the clarification [~wind_ljy]

FYI, I've published a [PR|https://github.com/apache/flink/pull/13709] for 
FLINK-19401.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Critical
> Fix For: 1.12.0
>
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216590#comment-17216590
 ] 

Jiayi Liao commented on FLINK-19596:


[~roman_khachatryan] In the losing-regaining leadership case, currently the 
{{CompletedCheckpointStore}} will be re-constructed because the 
{{CompletedCheckpointStore}} and {{ExecutionGraph}} share the same lifecycle 
for now (which should be decoupled from my perspective).

I think your solution is better, because we don't need to consider the 
{{CompletedCheckpointStore}}'s lifecycle in this way and it can also work well 
in the future when {{CompletedCheckpointStore}} is decoupled from 
{{ExecutionGraph}} in the future (maybe this case will happen).

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Critical
> Fix For: 1.12.0
>
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216581#comment-17216581
 ] 

Roman Khachatryan commented on FLINK-19596:
---

Hi,
 I was thinking about the solution for a related issue FLINK-19401. 
 What I came up with is skipping loading checkpoints from FS *after listing 
them in ZK* inside ZooKeeperCompletedCheckpointStore.recover() - if they are 
already loaded.

It seems [~wind_ljy] that your proposal is similar, but it also skips the 
listing of checkpoints in ZK. Is that correct?
 If so, wouldn't it allow restoring from a not the latest checkpoint? (e.g. in 
case of losing-regaining leadership)?

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Critical
> Fix For: 1.12.0
>
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216563#comment-17216563
 ] 

Till Rohrmann commented on FLINK-19596:
---

Thanks for creating this issue [~wind_ljy]. I think you are right that we 
should only recover the checkpoints once during a single leader session. 
Ideally, this should happen when the {{JobMaster}} is started. In order to do 
it properly, we would have to move the {{CompletedCheckpointStore}} from the 
{{ExecutionGraph}} to the {{JobMaster}} level. If this is too involved, then 
using a lazy loading strategy sounds good to me. [~roman_khachatryan] proposed 
in a private thread to check whether the set of registered checkpoint ids has 
changed and only if this is true to load the checkpoints again.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216541#comment-17216541
 ] 

Jiayi Liao commented on FLINK-19596:


[~yunta] I didn't know there's already a JIRA before. Thanks for the remind and 
you can re-assign it to me. 

I agree with [~zjwang] that it may not be a good solution to couple 
{{CompletedCheckpointStore.recover()}} with the types of failover. The best 
solution comes to my mind for now is the same as [~zjwang]'s solution, letting 
{{CompletedCheckpointStore}} recover itself if it's not been recovered yet, 
when {{CheckpointCoordinator}} or other components trying to access the states 
in {{CompletedCheckpointStore}}.

Specifically, I'm going to add new interface {{boolean 
isCheckpointStoreInitialized()}} in {{CompletedCheckpointStore}} and allow 
different implementations based on its root component.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216530#comment-17216530
 ] 

Yun Tang commented on FLINK-19596:
--

[~wind_ljy] this issue duplicates with FLINK-6984 and I think this deserve a 
fix although not so high priority.

I took the ticket of FLINK-6984 previously and not submit any valid PR yet. I 
could re-assign that ticket to you if you have interest and time.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216520#comment-17216520
 ] 

Zhijiang commented on FLINK-19596:
--

I heard of this issue proposed long time ago from [~tangyun], maybe he knew 
some other backgrounds.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216517#comment-17216517
 ] 

Zhijiang commented on FLINK-19596:
--

Thanks for proposing this potential improvement [~wind_ljy]!

+1 for the motivation. 

If I understood correctly, the 
`ZooKeeperCompletedCheckpointStore#completedCheckpoints`  is not loaded (lazy 
loading) during class construction, and it would be updated in append way 
during follow up `CheckpointCoordinator#completePendingCheckpoint`. 

Once `CheckpointCoordinator#restoreLatestCheckpointedStateInternal, it lazy 
loads all the completed checkpoints via `CompletedCheckpointStore#recover`.  So 
it has some duplicate overhead here to read already completed checkpoint twice.

Regarding the solution, I am not quite clear whether we can couple this 
decision(CompletedCheckpointStore#recover) with global/local/regional failover. 
E.g. JM leader change will cause global failover now, but if we improve it 
future, it might not need job restart via reconciling.  

If we can refactor to make 
`completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery)` 
cover the internal logic of `completedCheckpointStore.recover()`, it might seem 
more make sense. I mean we only care about to get latest checkpoint via 
`#getLatestCheckpoint` interface, and in its internal implementations it can 
judge whether further need to call `#recover` to load previously completed 
checkpoint or not.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-17 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17215808#comment-17215808
 ] 

Nicholas Jiang commented on FLINK-19596:


[~wind_ljy], I agree with the point you mentioned. CheckpointCoordinator could 
restore latest checkpointed state in the cases including local or regional 
failover, global restore style operation and restoring the state with the 
savepoint. CheckpointCoordinator doesn't need to recover the checkpoints on 
each recovery of local or regional failover because this failover lack of the 
leadership. We could add the isGlobalRecovery check for 
completedCheckpointStore.recover caller.
cc [~trohrmann]

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-12 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17212851#comment-17212851
 ] 

Jiayi Liao commented on FLINK-19596:


Currently {{restoreLatestCheckpointedStateInternal}} would be invoked in three 
cases below:

1. Non-Gloabal recovery.
2. Global recovery.
3. Savepoint restore.

Since it's hard to identify the reason of Global Recovery(JM leadership changed 
or a real global failover), we might be able to skip the recovery of 
{{completedCheckpointStore}} on Non-Global recovery.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)