[jira] [Comment Edited] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-19 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216520#comment-17216520
 ] 

Zhijiang edited comment on FLINK-19596 at 10/19/20, 8:04 AM:
-

I heard of this issue proposed long time ago from [~yunta], maybe he knew some 
other backgrounds.


was (Author: zjwang):
I heard of this issue proposed long time ago from [~tangyun], maybe he knew 
some other backgrounds.

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-17 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17215808#comment-17215808
 ] 

Nicholas Jiang edited comment on FLINK-19596 at 10/17/20, 7:44 AM:
---

[~wind_ljy], I agree with the point you mentioned. CheckpointCoordinator could 
restore latest checkpointed state in the cases including local or regional 
failover, global restore style operation and restoring the state with the 
savepoint. CheckpointCoordinator doesn't need to recover the checkpoints on 
each recovery of local or regional failover because this failover lack of the 
leadership. We could add the isGlobalRecovery check for 
completedCheckpointStore.recover caller.
cc [~trohrmann],[~zjwang]


was (Author: nicholasjiang):
[~wind_ljy], I agree with the point you mentioned. CheckpointCoordinator could 
restore latest checkpointed state in the cases including local or regional 
failover, global restore style operation and restoring the state with the 
savepoint. CheckpointCoordinator doesn't need to recover the checkpoints on 
each recovery of local or regional failover because this failover lack of the 
leadership. We could add the isGlobalRecovery check for 
completedCheckpointStore.recover caller.
cc [~trohrmann][~zjwang]

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19596) Do not recover CompletedCheckpointStore on each failover

2020-10-17 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17215808#comment-17215808
 ] 

Nicholas Jiang edited comment on FLINK-19596 at 10/17/20, 7:43 AM:
---

[~wind_ljy], I agree with the point you mentioned. CheckpointCoordinator could 
restore latest checkpointed state in the cases including local or regional 
failover, global restore style operation and restoring the state with the 
savepoint. CheckpointCoordinator doesn't need to recover the checkpoints on 
each recovery of local or regional failover because this failover lack of the 
leadership. We could add the isGlobalRecovery check for 
completedCheckpointStore.recover caller.
cc [~trohrmann][~zjwang]


was (Author: nicholasjiang):
[~wind_ljy], I agree with the point you mentioned. CheckpointCoordinator could 
restore latest checkpointed state in the cases including local or regional 
failover, global restore style operation and restoring the state with the 
savepoint. CheckpointCoordinator doesn't need to recover the checkpoints on 
each recovery of local or regional failover because this failover lack of the 
leadership. We could add the isGlobalRecovery check for 
completedCheckpointStore.recover caller.
cc [~trohrmann]

> Do not recover CompletedCheckpointStore on each failover
> 
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.2
>Reporter: Jiayi Liao
>Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)