[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407365#comment-17407365 ] ming li commented on FLINK-22483: - Hi, [~trohrmann]. Thanks for your reply, I will create a separate issue to follow up later. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: David Morávek >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407351#comment-17407351 ] Till Rohrmann commented on FLINK-22483: --- Ui, if the reconstruction of the {{SharedStateRegistry}} takes so long then we should indeed try to avoid this operation. Conceptually, if we don't change the set of {{CompletedCheckpoints}}, then there should be no need to recreate the {{SharedStateRegistry}}. Can you open a separate JIRA ticket for this problem as this is a follow up step for this ticket [~Ming Li]? cc [~pnowojski]. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: David Morávek >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407055#comment-17407055 ] ming li commented on FLINK-22483: - Hi, [~trohrmann], thanks for your reply. It seems that more of the purpose is to ensure isolation. I checked the previous issue, we re-created a new {{SharedStateRegistry}} to avoid asynchronous cleanup that may cause the counter to be less than 1. But if we use the same {{SharedStateRegistry}} and do not clear, it seems that there will be no such problem. In fact, in our production environment, we discard part of the data and state to only restart the failed task, but found that it may take several seconds to register the {{SharedStateRegistry}} (thousands of tasks and dozens of TB states). When there are a large number of task failures at the same time, this may take several minutes (number of tasks * several seconds). Therefore, we are considering whether it is possible to reduce the recovery time without re-registering the {{SharedStateRegistry}} and without clearing. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: David Morávek >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406761#comment-17406761 ] Till Rohrmann commented on FLINK-22483: --- Hi [~Ming Li], I think we do create a new {{SharedStateRegistry}} because we want to isolate the new instance from potential asynchronous cleanup tasks that might change the old instance. Moreover, creating the {{SharedStateRegistry}} should not be very costly and does not involve blocking I/O operations if I am not mistaken. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: David Morávek >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406647#comment-17406647 ] ming li commented on FLINK-22483: - Hi, [~dmvk], [~edu05]. Could you please help answer one of my questions? If we can only recover the ??CompletedCheckpointStore?? once at startup, why does the ??sharedStateRegistry?? need to be re-registered every time when the job is restarted? Is it possible to re-register only once at startup? > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: David Morávek >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396923#comment-17396923 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~dmvk] sorry for taking so long to complete the review. I've done my best to review the PR, I only left a minor comment. I can't say I understand the transition of checkpoints to the suspended state and probably other aspects of their management. Admittedly [~trohrmann] has done a much better job at reviewing it. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394602#comment-17394602 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~dmvk] sure I'll give it a look shortly > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17394090#comment-17394090 ] David Morávek commented on FLINK-22483: --- Hi [~edu05] , if you'd have a time to do a code review on [https://github.com/apache/flink/pull/16652] it would be helpful ;) > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391813#comment-17391813 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Ok [~dmvk], it's a shame though. Let me know if I can assist at any stage with this story. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17388551#comment-17388551 ] David Morávek commented on FLINK-22483: --- After talking to [~rmetzger], we'll try to make things more robust on the Flink side, so we don't rely on the side effects of `recover()` method. I need to give this some more thoughts, and need to sync on this with [~Nicolaus Weidner] who started working on FLINK-23317. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387185#comment-17387185 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hmmm, how should we proceed [~dmvk]? > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387176#comment-17387176 ] David Morávek commented on FLINK-22483: --- After another pass on the issue, this has been fixed for the `DefaultCompletedCheckpointStore` in FLINK-19596. The reported issue is actually related to custom implementation of `CompletedCheckpointStore`, that doesn't follow the same contract as `DefaultCompletedCheckpointStore`. `CompletedCheckpointStore#recover()` is now poorly defined and we either need to sync this with `DefaultCompletedCheckpointStore` implementation or make an effort to have a proper definition for various recovery scenarios. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381072#comment-17381072 ] Robert Metzger commented on FLINK-22483: It seems like the {{CheckpointStoreITCase.testRestartOnRecoveryFailure(CheckpointStoreITCase.java:93)}} test is hanging (if you scroll further up, you see that the "main" thread is stuck in this method). You can download the full logs of that CI run to get the output of the hanging test. Most likely, you'll see in the test what's going wrong. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380878#comment-17380878 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~dmvk] , you might have noticed the build is failing. I've tried rerunning it a couple of times as it doesn't seem to be failing for any test failure or any other reason I can recognise. Maybe you could shed some light into this? This is the link to the pipeline [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20433=results] Here's an extract of the build logs I'm referring to as puzzling ``` "G1 Concurrent Refinement Thread#22" os_prio=0 tid=0x7f5eb804e000 nid=0x4533 runnable "G1 Concurrent Refinement Thread#23" os_prio=0 tid=0x7f5eb804c800 nid=0x4532 runnable "VM Periodic Task Thread" os_prio=0 tid=0x7f5eb82d2000 nid=0x4566 waiting on condition JNI global references: 1539 = === WARNING: Killing task === = /__w/3/s/tools/ci/watchdog.sh: line 100: 1182 Terminated watchdog /__w/3/s/tools/ci/watchdog.sh: line 100: 1183 Terminated ( $cmd & PID=$!; echo $PID 1>&3; wait $PID; echo $? 1>&4 ) 3> $CMD_PID 4> $CMD_EXIT 1184 | tee $CMD_OUT ./tools/azure-pipelines/uploading_watchdog.sh: line 76: 277 Terminated $COMMAND The STDIO streams did not close within 10 seconds of the exit event from process '/bin/bash'. This may indicate a child process inherited the STDIO streams and has not yet exited. ##[error]Bash exited with code '143'. ``` > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379952#comment-17379952 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~dmvk] , here's the PR! https://github.com/apache/flink/pull/16484 > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Labels: pull-request-available > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379890#comment-17379890 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~dmvk] , expect the PR at some point today :) > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) > ~[?:1.8.0_282] >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379744#comment-17379744 ] David Morávek commented on FLINK-22483: --- Hi [~edu05], how is the PR going? We'd love to see this issue fixed in 1.14 release. Feel free to let me know if you currently don't have available time slots for this, I'd be able to take over. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377524#comment-17377524 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Understood [~dmvk] , thanks for the detailed info. I'll make the changes as suggested and raise a proper PR. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Assignee: Eduardo Winpenny Tejedor >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377386#comment-17377386 ] David Morávek commented on FLINK-22483: --- As for [~sewen]'s comment, I'll create a separate issue for lowering the `{{CompletedCheckpointStore}}` memory pressure, so we can figure this one faster. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377382#comment-17377382 ] David Morávek commented on FLINK-22483: --- Hi [~edu05], let me take over here from [~trohrmann] ;) In general, your approach looks good and does what Till has described :+1: Few observations from my side: 1) We already construct `DefaultCompletedCheckpointStore` in the correct thread, inside suggested `internalCreateJobMasterService` just deeper down the stack. 2) Call to `DefaultCompletedCheckpointStore#recover()` basically does two things - Synchronizes all checkpoint pointers with in-memory state ("completed checkpoint queue") - If there is any-change in the state, fetch the actual checkpoint from the state handle (s3, hdfs, ...) The latter is an actual "heavy lifting", that can block the RPC thread for non-trivial amount of time. (synchronizing pointers is not for free also, but I guess 80/20 rule would apply here). 3) Call `DefaultCompletedCheckpointStore#recover()` is idempotent. That being said, you've correctly called `recover()` to warm-up the Checkpoint Store, but I think you don't have to deal with all the hustle around passing `CompletedCheckpointStore` trought the constructor chain. Maybe you can just call `recover()` after you construct the store in `org.apache.flink.runtime.scheduler.SchedulerUtils#createCompletedCheckpointStoreIfCheckpointingIsEnabled`? Best, D. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375056#comment-17375056 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Awesome! Let me know what you think [~trohrmann] > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17374740#comment-17374740 ] Till Rohrmann commented on FLINK-22483: --- Sorry for the lack of responsiveness [~edu05]. I'll try to take a look at your proposal this week. We should try to solve this problem for the {{1.14}} release. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372174#comment-17372174 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi, [~rmetzger] or [~trohrmann] could one of you follow up with this thread? If this is a low priority item I can always switch to another item but I'd rather stick to this now that we've got the ball rolling. Thanks. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368536#comment-17368536 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~trohrmann] I'm hoping the gist of the idea is captured in [https://github.com/edu05/flink/commit/4e79d531e34cbb6857472fbd2c8a53107fc23847] . The call to recover has been lifted to the {{DefaultJobMasterServiceFactory}} . If it doesn't need to be moved elsewhere that might be suitable. [~sewen] 's comment is still left to do but I wanted to see if we were thinking along the same lines > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364747#comment-17364747 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- [~trohrmann] that's right but the {{CheckpointCoordinator}} gets called from multiple places, and those also get called from multiple other places. They all end up invoking {{CompletedCheckpointStore::recover}} in the stack call. Should those stay unchanged? > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364733#comment-17364733 ] Till Rohrmann commented on FLINK-22483: --- Where else is {{CompletedCheckpointStore::recover}} being called from? I think it is only called by the {{CheckpointCoordinator}} atm. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364607#comment-17364607 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- OK [~trohrmann] I see what you're saying. That can be made to work for {{JobMaster}}'s invocation of {{CompletedCheckpointStore::recover}} but what about all other invocations? That method gets called from various other places, is the idea to change those too? If so, where is the {{CompletedCheckpointStore::recover}} call meant to be moved for those? > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17362854#comment-17362854 ] Till Rohrmann commented on FLINK-22483: --- Hi [~edu05], I think one idea could be to move the {{CompletedCheckpointStore::recover}} call out of the {{CheckpointCoordinator}}. In some sense, we would impose a new invariant which says that the {{CompletedCheckpointStore}} contains always the recovered {{Checkpoints}}, when it is given to the {{CheckpointCoordinator}}. If this is true, then you can remove the {{recover}} call inside of the {{CheckpointCoordinator}}. Now, the question is when to do the call. Here, I would suggest to do it in {{DefaultJobMasterServiceFactory.internalCreateJobMasterService}}, for example. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17362667#comment-17362667 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi [~trohrmann] , sorry for the delay but it's taken me a while to get some spare time and to crack this one out. I'll give you an update. I've got a couple of ideas as to how this could be done. Either move the handling of the {{DefaultCompletedCheckpointStore}} to the {{DefaultJobMasterServiceFactory}} as you suggested or move the recovering of checkpoints to a function close to where it already is but only call it from {{DefaultJobMasterServiceFactory}}. I'd like to know a couple more details before committing to either solution. The stack trace presented in the ticket is indeed a call to {{CompletedCheckpointStore::recover}} but it seems to me to already happen [asynchronously|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L259]. I think the call that needs lifting and shifting is [this one|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L134] which does get invoked from the {{JobMaster}} constructor. Am I on the right track here? The call to {{CompletedCheckpointStore::recover}} lies deep in many layers of calls, how many wrapping layers need to be shifted to the {{DefaultJobMasterServiceFactory}}? Moving only the call to {{CompletedCheckpointStore::recover}} wouldn't make sense without moving [this whole block too|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L129]. {code:java} final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { // check whether we find a valid checkpoint if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( new HashSet<>(newExecutionGraph.getAllVertices().values( { // check whether we can restore from a savepoint tryRestoreExecutionGraphFromSavepoint( newExecutionGraph, jobGraph.getSavepointRestoreSettings()); } } {code} Again, let me know if I'm not on the right track. Finally, where would the code need to be shifted exactly for it to not bother the intended execution of the program? Before the instantiation of the {{JobMaster}}? After {{JobMaster::start}}? Anywhere in between would still be blocking wouldn't it? Possibly use a separate {{Executor}} for that? Hope to get cracking on this one soon! > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Labels: stale-critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346460#comment-17346460 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Thanks [~trohrmann] , I'll give this a try. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) > ~[?:1.8.0_282] > at >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17346143#comment-17346143 ] Till Rohrmann commented on FLINK-22483: --- Thanks for volunteering [~edu05]. The problem is not super trivial so it would be great to first see your solution idea. Some pointer to get started: The {{DefaultJobMasterServiceFactory}} creates the {{JobMaster}} instance. If the {{DefaultCompletedCheckpointStore}} would recover the checkpoints in the {{DefaultJobMasterServiceFactory.internalCreateJobMasterService}}, then it should not block later. One thing to do is to move the management of the {{DefaultCompletedCheckpointStore}} into the {{JobMaster}}. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17345584#comment-17345584 ] Eduardo Winpenny Tejedor commented on FLINK-22483: -- Hi I'm willing to take this story with a little bit of further explanation please. I haven't found a `JobManager` class as such in the sourcecode. I'm clear on the `CompletedCheckpointStore` only needing to keep one checkpoint. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) >
[jira] [Commented] (FLINK-22483) Recover checkpoints when JobMaster gains leadership
[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17333103#comment-17333103 ] Stephan Ewen commented on FLINK-22483: -- I think this is a good idea. I would add, though, that we should change the {{CompletedCheckpointStore}} then to only keep one checkpoint, because the {{CompletedCheckpoint}} object can have a pretty big memory footprint, if you have a lot of in-line state. So keeping multiple checkpoints can create problems. We also need only the latest checkpoint/savepoint. Even with the flag "preferCheckpoint" or future changes that might skip over savepoints for recovery, the store needs to only keep the latest checkpoint that will be used for recovery. > Recover checkpoints when JobMaster gains leadership > --- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) >