[jira] [Commented] (FLINK-34526) Actively disconnect the killed TM in RM to reduce restart time
[ https://issues.apache.org/jira/browse/FLINK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830415#comment-17830415 ] Rui Fan commented on FLINK-34526: - Merged to master(1.20) via : a54ee77a64c2055bbdb187f0c7d12c8a0210f96c > Actively disconnect the killed TM in RM to reduce restart time > -- > > Key: FLINK-34526 > URL: https://issues.apache.org/jira/browse/FLINK-34526 > Project: Flink > Issue Type: Sub-task >Reporter: junzhong qin >Assignee: junzhong qin >Priority: Not a Priority > Labels: pull-request-available > Attachments: image-2024-02-27-15-50-25-071.png, > image-2024-02-27-15-50-39-337.png > > > In our test case, the pipeline is: > !image-2024-02-27-15-50-25-071.png! > # parallelism = 100 > # taskmanager.numberOfTaskSlots = 2 > # disable checkpoint > h3. Phenomenon > When the worker was killed at 2024-02-27 15:10:13,691 > {code:java} > 2024-02-27 15:10:13,691 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Worker container_e2472_1706081484717_60538_01_50 is terminated. > Diagnostics: Container container_e2472_1706081484717_60538_01_50 marked > as failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container > killed on request. Exit code is 137[2024-02-27 15:10:12.763]Container exited > with a non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external > signal {code} > It took about 20 seconds to restart the job. > {code:java} > 2024-02-27 15:10:30,749 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) > switched from RUNNING to FAILED on > container_e2472_1706081484717_60538_01_50 @ xxx > (dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: > TaskManager with id container_e2472_1706081484717_60538_01_50(xxx:5454) > is no longer reachable. > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515) > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)// > Deploy and run task > 2024-02-27 15:10:32,426 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) > switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (69/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) > switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) > switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (69/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) > switched from INITIALIZING to RUNNING. {code} > > h3. Reason > When the RM received the message the TM was killed, the JobMaster still kept > the connection with the killed TM. And the JobMaster found the TM is no > longer reachable after about 17 seconds. > h3. Solution: > We can reduce the restart time by disConnectTaskManager actively in > ResourceManager > {code:java} > // class ResourceManager > protected Optional closeTaskManagerConnection( > final ResourceID resourceID, final Exception cause) { >// >// using JobManagerGateway to actively disconnect TaskManager > > workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34526) Actively disconnect the killed TM in RM to reduce restart time
[ https://issues.apache.org/jira/browse/FLINK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823017#comment-17823017 ] junzhong qin commented on FLINK-34526: -- [~xtsong] Thanks for the detailed information. I will prepare the PR. > Actively disconnect the killed TM in RM to reduce restart time > -- > > Key: FLINK-34526 > URL: https://issues.apache.org/jira/browse/FLINK-34526 > Project: Flink > Issue Type: Sub-task >Reporter: junzhong qin >Assignee: junzhong qin >Priority: Not a Priority > Attachments: image-2024-02-27-15-50-25-071.png, > image-2024-02-27-15-50-39-337.png > > > In our test case, the pipeline is: > !image-2024-02-27-15-50-25-071.png! > # parallelism = 100 > # taskmanager.numberOfTaskSlots = 2 > # disable checkpoint > h3. Phenomenon > When the worker was killed at 2024-02-27 15:10:13,691 > {code:java} > 2024-02-27 15:10:13,691 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Worker container_e2472_1706081484717_60538_01_50 is terminated. > Diagnostics: Container container_e2472_1706081484717_60538_01_50 marked > as failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container > killed on request. Exit code is 137[2024-02-27 15:10:12.763]Container exited > with a non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external > signal {code} > It took about 20 seconds to restart the job. > {code:java} > 2024-02-27 15:10:30,749 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) > switched from RUNNING to FAILED on > container_e2472_1706081484717_60538_01_50 @ xxx > (dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: > TaskManager with id container_e2472_1706081484717_60538_01_50(xxx:5454) > is no longer reachable. > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515) > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)// > Deploy and run task > 2024-02-27 15:10:32,426 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) > switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (69/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) > switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) > switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (69/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) > switched from INITIALIZING to RUNNING. {code} > > h3. Reason > When the RM received the message the TM was killed, the JobMaster still kept > the connection with the killed TM. And the JobMaster found the TM is no > longer reachable after about 17 seconds. > h3. Solution: > We can reduce the restart time by disConnectTaskManager actively in > ResourceManager > {code:java} > // class ResourceManager > protected Optional closeTaskManagerConnection( > final ResourceID resourceID, final Exception cause) { >// >// using JobManagerGateway to actively disconnect TaskManager > > workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34526) Actively disconnect the killed TM in RM to reduce restart time
[ https://issues.apache.org/jira/browse/FLINK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821640#comment-17821640 ] Xintong Song commented on FLINK-34526: -- This makes sense tome. Just to provide some backgrounds on this. Flink's JM, RM and TM coordinates based on an assumption that the ground truth of how TMs' resources are allocated for JMs lies with the TMs. Therefore, to avoid inconsistency, RM does not tell JM which resource (slot) from which TM is (de)allocated for it, nor tell JM to connect to or disconnect from a TM. However, this IMHO might be a bit over protecting. If a TM is known for sure to be terminated, I don't see any problem in notifying relevant JMs earlier about it. > Actively disconnect the killed TM in RM to reduce restart time > -- > > Key: FLINK-34526 > URL: https://issues.apache.org/jira/browse/FLINK-34526 > Project: Flink > Issue Type: Sub-task >Reporter: junzhong qin >Assignee: junzhong qin >Priority: Not a Priority > Attachments: image-2024-02-27-15-50-25-071.png, > image-2024-02-27-15-50-39-337.png > > > In our test case, the pipeline is: > !image-2024-02-27-15-50-25-071.png! > # parallelism = 100 > # taskmanager.numberOfTaskSlots = 2 > # disable checkpoint > h3. Phenomenon > When the worker was killed at 2024-02-27 15:10:13,691 > {code:java} > 2024-02-27 15:10:13,691 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Worker container_e2472_1706081484717_60538_01_50 is terminated. > Diagnostics: Container container_e2472_1706081484717_60538_01_50 marked > as failed. Exit code:137. Diagnostics:[2024-02-27 15:10:12.720]Container > killed on request. Exit code is 137[2024-02-27 15:10:12.763]Container exited > with a non-zero exit code 137. [2024-02-27 15:10:12.839]Killed by external > signal {code} > It took about 20 seconds to restart the job. > {code:java} > 2024-02-27 15:10:30,749 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_0) > switched from RUNNING to FAILED on > container_e2472_1706081484717_60538_01_50 @ xxx > (dataPort=38597).org.apache.flink.runtime.jobmaster.JobMasterException: > TaskManager with id container_e2472_1706081484717_60538_01_50(xxx:5454) > is no longer reachable. > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1515) > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)// > Deploy and run task > 2024-02-27 15:10:32,426 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) > switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:32,427 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (69/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) > switched from DEPLOYING to INITIALIZING.2024-02-27 15:10:33,347 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (70/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_69_1) > switched from INITIALIZING to RUNNING.2024-02-27 15:10:33,421 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > datagen_source[1] -> Sink: print_sink[2] (69/100) > (2a1d06e6610bc499475fa6e647f8cac9_d3f21cabc6fe0fdf76c8be915bdb22a2_68_1) > switched from INITIALIZING to RUNNING. {code} > > h3. Reason > When the RM received the message the TM was killed, the JobMaster still kept > the connection with the killed TM. And the JobMaster found the TM is no > longer reachable after about 17 seconds. > h3. Solution: > We can reduce the restart time by disConnectTaskManager actively in > ResourceManager > {code:java} > // class ResourceManager > protected Optional closeTaskManagerConnection( > final ResourceID