[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17396691#comment-17396691 ] Till Rohrmann commented on FLINK-16069: --- These are great results. Thanks a lot for improving the deployment speed [~Thesharing] and [~zhuzh] :-) > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results, batch.png, streaming.png > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395728#comment-17395728 ] Zhu Zhu commented on FLINK-16069: - Thanks for making the improvements and sharing the results! [~Thesharing] The attached graph shows the improvement on TaskDeploymentDescriptor creation. And the table shows the E2E deployment improvement. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results, batch.png, streaming.png > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393869#comment-17393869 ] Zhilong Hong commented on FLINK-16069: -- After the optimization we've done in FLINK-23005, the performance of the task deployment has improved. In our experiment, for a large-scale streaming word count job, the speed of task deployment is 6 times faster than before. The result is illustrated below: ||Parallelism||Before||After || |8000*8000|32.611s|6.480s| |16000*16000|128.408s|19.051s| The improvement is also shown in the benchmark implemented in FLINK-20612: !streaming.png|width=800! !batch.png|width=800! In our opinion, this ticket can be closed for now. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results, batch.png, streaming.png > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354757#comment-17354757 ] Zhu Zhu commented on FLINK-16069: - Even if the main thread can have the highest priority, GC problem can still happen when serialized {{TaskDeploymentDescriptor}}s are generated too fast and queued to be sent out. These queued {{TaskDeploymentDescriptor}}s will cost lots of memory and cannot be {{GC}}ed before sent out. Frequent young GC would consume lots of CPU. And more heap memory will be required or consecutive full GC could happen. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354477#comment-17354477 ] Chesnay Schepler commented on FLINK-16069: -- Have we ever experimented with thread priorities such that the main threads have the highest priority? Then the question of how many threads the serializationExecutor gets becomes irrelevant. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354434#comment-17354434 ] Till Rohrmann commented on FLINK-16069: --- Maybe one could start by taking {{#cpus/2}} as a default value. Alright, caching the shuffle descriptors for all to all connections makes sense. I guess this could reduce the load quite a bit. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354420#comment-17354420 ] Zhu Zhu commented on FLINK-16069: - Yes a dedicated {{serializationExecutor}} is an alternative. One thing I'm a bit unsure about it is how to find a proper value of the thread number. A large value may lead to queued {{shuffleDescriptors}} can affect the GC. A small value may make {{TaskManagerGateway#submitTask()}} the bottleneck of the deployment and affect deployment performance. Large {{shuffleDescriptors}} are generated in All-to-All connection case. In this case {{shuffleDescriptors}} can be shared among all executions of the same job vertex if we add cache for it. If it is pointwise case, the {{shuffleDescriptors}} will be small and not shipped via blobs. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354373#comment-17354373 ] Till Rohrmann commented on FLINK-16069: --- This is a good finding [~zhuzh]. I like the ideas. Another idea could be to introduce a dedicated {{serializationExecutor}} which is used for the {{submitTask}} call and which has a limited number of threads. That way we isolate the CPU-intensive parts a bit. Are the {{shuffleDescriptors}} {{Execution}} specific? If yes, then we will have a lot of IO operations for writing the blobs when deploying the {{Executions}}. This might lead to other problems if we write the files from the main thread. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17352943#comment-17352943 ] Zhu Zhu commented on FLINK-16069: - Thanks for the suggestion and sorry for the late reply! [~trohrmann] I have some findings of the cause of heartbeat timeout. The cause looks to be that the serialization to {{submitTask()}} via the future executors overwhelmed the JVM & machine. When the problem happens, there are lots of (in our case, it is 96 which equals to the CPU cores of the machines) future executor threads busy doing the serialization, which makes the main thread hard to get enough CPU timeslices to handle the RPCs. Besides that, akka threads are not able to handle the generated {{submitTask}} messages in time. Lots of such large messaged queued in memory will lead to GC problems which add further loads to CPU. I tried decreasing the future executor threadpool size to 4 and the heartbeat timeout problem is gone. I think the future executor threadpool size is a bit too large if we expect these threads to do CPU intensity work. Currently it is hardcoded to be all the available cores of the JVM (via {{Runtime.getRuntime().availableProcessors()}}) and in many cases it is also all the cores of the machine. Maybe we can add a configurable max limit to it. Another option is proposed by [~Thesharing] to optimize the process of {{submitTask()}} by shipping the {{shuffleDescriptors}} via blob service. This is possible to relieve the heavy computation to {{submitTask()}} in future executor threads and can also speed up the task submission. He is working on a PoC at the moment and will share the result soon. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312220#comment-17312220 ] Till Rohrmann commented on FLINK-16069: --- Maybe we should remove {{AkkaRpcService.getExecutor}} and {{AkkaRpcService.getScheduledExecutor}} to prevent this situation from happening. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312218#comment-17312218 ] Till Rohrmann commented on FLINK-16069: --- Could it be that we use somehow the {{ActorSystems}} dispatcher for some of these tasks? If this is the case, then it could explain why the dispatcher threadpool is starved. If not, then maybe we can increase the priority of the dispatcher threadpool's thread to give them more CPU cycles. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311960#comment-17311960 ] Zhu Zhu commented on FLINK-16069: - >From what I can see, heartbeat timeout happens because the scheduled >heartbeats sending actions(HeartbeatManagerSenderImpl::run) are not executed >in time. By sampling the jstacks during deploying tasks, JM main thread is not blocked in a certain process and is processing in coming requests (mainly `heartbeatFromTaskManager` and `updateTaskExecutionState` (tasks switching to RUNNING)). However, sometimes in the sampled result, there is even no active JM main thread. So I also suspect that messages are not put into JM actor's mailbox in time or the mailbox events are not dispatched in time. Note that this problem happens during the deploying stage, at this time all the future-executor thread and akka remoting dispatcher threads are busy dealing with `submitTask` messages. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311563#comment-17311563 ] Till Rohrmann commented on FLINK-16069: --- Thanks for providing the feedback [~zhuzh]. The result look already quite promising :-) Do you have a suspicion which operation still blocks the main thread? > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > Attachments: FLINK-16069-POC-results > > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291607#comment-17291607 ] Zhu Zhu commented on FLINK-16069: - I had a PoC to verify the idea to cache and reuse ShuffleDescriptor for ALL-to-ALL connection pattern. https://github.com/zhuzhurk/flink/commits/FLINK_16069_deployment_perf_improvement_poc3 With this change, the time to deploy tasks in main thread for a 8000x8000 job can be reduced from ~90s to ~5s. This means main thread can be relieved. However, the E2E deployment performance does have an significant improvement because the bottleneck now becomes the submission of TaskDeploymentDescriptor. It was 1min 40s before the improvement and still 1min 30s after the improvement. (see key logs in attached {{FLINK-16069-POC-results}}) Theoretically, reduce the TDD creation time to unblock main thread is good enough in the scope of this ticket. However, I also noticed heartbeat timeout errors can happen sometimes with this improvement. So I still need some more time to look into why this timeout can happen and how to solve it. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291544#comment-17291544 ] Till Rohrmann commented on FLINK-16069: --- This ticket has been touched by the backlog grooming service and seems to be still valid. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174053#comment-17174053 ] Yumeng Zhang commented on FLINK-16069: -- [~zhuzh] Yes, we encountered the same problem when deploying a job with a large number of tasks. It took a few minutes to finish deploying these tasks. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173885#comment-17173885 ] Zhu Zhu commented on FLINK-16069: - Hi Yumeng, I think we have not reached a consensus. I once did a PoC of the idea that "cache generated ShuffleDescriptors for ALL-to-ALL edges for reuse". But there were no further progress due to some other prioritized work. I'd like to understand if this has become a serious and urgent problem for you. If it is, we can resume this discussion. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173036#comment-17173036 ] Yumeng Zhang commented on FLINK-16069: -- Hello guys, do you reach a consensus about how to optimize creating TaskDeploymentDescriptor? > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045531#comment-17045531 ] Zhu Zhu commented on FLINK-16069: - >> Is there anything in the ShuffleDescriptor that would be different depending >> on who reads the intermediate result? I think not. ShuffleDescriptor does not depend on the consumer. It is created from PartitionDescriptor and ProducerDescriptor when the producer is assigned a slot and the partition is registered to the ShuffleMaster. That also means we cannot generate {{ShuffleDescriptor}} before scheduling the tasks. (this answers "In general, can we not create the shuffle descriptor when the task scheduled?" if I understand correctly) > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045395#comment-17045395 ] Stephan Ewen commented on FLINK-16069: -- "each InputGateDeploymentDescriptor of B instances should contain the same 8000 ShuffleDescriptor" Yes, that sounds like a good approach. In general, can we not create the shuffle descriptor when the task scheduled? Is there anything in the ShuffleDescriptor that would be different depending on who reads the intermediate result? > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045135#comment-17045135 ] Zhu Zhu commented on FLINK-16069: - Making building deployment descriptor more static seems not to be an easy work. The critical path is the {{TaskDeploymentDescriptorFactory#createInputGateDeploymentDescriptors() -> getConsumedPartitionShuffleDescriptors()}} which generates {{InputGateDeploymentDescriptor}}s for a task. This relies on {{ShuffleDescriptor}}s which are created from the producer tasks's {{ResultPartitionDeploymentDescriptor}}s. {{ResultPartitionDeploymentDescriptor}}, however, is created lazily, i.e. only after its producer has acquired the slot to determine the location. Besides that, even if we can create the {{InputGateDeploymentDescriptor}} in a static way and update it with the {{ResultPartitionDeploymentDescriptor}} when scheduling, seems we do not avoid the 8000x8000 complexity to compute the {{ShuffleDescriptor}}s. I'm thinking whether we can add a shortcut for the ALL-to-ALL pattern by caching generated {{ShuffleDescriptor}}s. e.g. In the case that A(parallelism=8000) -> B(parallelism=8000), each {{InputGateDeploymentDescriptor}} of B instances should contain the same 8000 {{ShuffleDescriptor}}s. So it is not needed to generate these 8000 shuffleDescriptors for 8000 times. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045107#comment-17045107 ] Yingjie Cao commented on FLINK-16069: - Maybe we can split the tasks to be deployed into multi mini batches and deploy the next batch only after the future of previous batch complete in an async way instead of doing all the work in a for loop. This may make the performance even worse, but it does not block the main thread too long for a single piece of work. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17041812#comment-17041812 ] Stephan Ewen commented on FLINK-16069: -- It it possible to make more of the deployment descriptor "static", meaning that we could pre-compute it on execution graph creation? > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039916#comment-17039916 ] Till Rohrmann commented on FLINK-16069: --- Whenever we access state which is not immutable belonging to the main thread (e.g. the {{ExecutionGraph}}), then we must do the access from the main thread. If this is not guaranteed, then we potentially produce a race condition. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039191#comment-17039191 ] huweihua commented on FLINK-16069: -- Hi, [~trohrmann], thanks for your time. Yes, The iteration over the input edges taking so long. I didn't think too much about race condition, and i tried create the TaskDeploymentDescriptorFactory in the main thread, then put createDeploymentDescriptor into a future. This reduces the time taken in the main thread to 2s. Glad to receive any suggestions. [有道词典|http://fanyi.youdao.com/translate?i=Sorry%20for%20not%20thinking%20about%20race%20condition.%20I%20am%20glad%20to=chrome] Sorry for not t ...[详细|http://fanyi.youdao.com/translate?i=Sorry%20for%20not%20thinking%20about%20race%20condition.%20I%20am%20glad%20to=dict=chrome.extension]X 对不起没有考虑竞争条件。我很高兴 > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16069) Creation of TaskDeploymentDescriptor can block main thread for long time
[ https://issues.apache.org/jira/browse/FLINK-16069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038976#comment-17038976 ] Till Rohrmann commented on FLINK-16069: --- Hi [~huwh], do you know what exactly is taking so long. Is the creation of the {{TaskDeploymentDescriptors}}? If yes, is it the iteration over the input edges? I think it is not as easy as moving the {{TaskDeploymentDescriptor}} creation into a future because we are accessing the {{ExecutionGraph}} through the passed result partitions. This means that in case of a concurrent recovery we might have a race condition where we read state from an already reset {{Execution}}, for example. > Creation of TaskDeploymentDescriptor can block main thread for long time > > > Key: FLINK-16069 > URL: https://issues.apache.org/jira/browse/FLINK-16069 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: huweihua >Priority: Major > > The deploy of tasks will take long time when we submit a high parallelism > job. And Execution#deploy run in mainThread, so it will block JobMaster > process other akka messages, such as Heartbeat. The creation of > TaskDeploymentDescriptor take most of time. We can put the creation in future. > For example, A job [source(8000)->sink(8000)], the total 16000 tasks from > SCHEDULED to DEPLOYING took more than 1mins. This caused the heartbeat of > TaskManager timeout and job never success. -- This message was sent by Atlassian Jira (v8.3.4#803005)