[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750078#comment-17750078 ] RocMarshal commented on FLINK-31757: Hi, [~heigebupahei] Thanks for your attention. we've updated the new edition design docs in [https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8/edit?usp=sharing] Would you mind having a look on the doc ? Any suggestion is appreciated~ > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > Attachments: image-2023-04-13-08-04-04-667.png > > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748386#comment-17748386 ] yuanfenghu commented on FLINK-31757: Is there any progress on this topic? We are also facing this problem now, which is quite a headache. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Labels: pull-request-available > Attachments: image-2023-04-13-08-04-04-667.png > > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730871#comment-17730871 ] Rui Fan commented on FLINK-31757: - Hi [~xtsong] [~zhuzh] [~wanglijie] , you may also be interested in this JIRA and look forward to your feedback, thanks~ BTW, I'm not sure whether it should be discussed as a FLIP, because: * It may add the public option. * It includes 2 parts, the task to slot and slot to TM, it may be done in multiple subtasks. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Attachments: image-2023-04-13-08-04-04-667.png > > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727041#comment-17727041 ] RocMarshal commented on FLINK-31757: I have compiled a draft [https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8/edit?usp=sharing] Looking forward your discussion. [~fanrui] [~huwh] [~Weijie Guo] [~chesnay] > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Attachments: image-2023-04-13-08-04-04-667.png > > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715580#comment-17715580 ] RocMarshal commented on FLINK-31757: I am still working on this jira, and due to the Labor Day holiday, I will provide a design draft as soon as possible after it. Thanks a lot. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Attachments: image-2023-04-13-08-04-04-667.png > > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712173#comment-17712173 ] Weihua Hu commented on FLINK-31757: --- I would like to bring in a common scenario that can't set tasks in the same parallelism. Some ETL pipeline jobs consume Kafka data, and then do some heavy transformation in Map operation. In this scenario, we can't set all parallelism globally because of Kafka partition number limitations. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > Attachments: image-2023-04-13-08-04-04-667.png > > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711932#comment-17711932 ] Rui Fan commented on FLINK-31757: - Hi [~chesnay] , thanks for your reply. {quote}The obvious solution for the user is to set the parallelism to 100 for everything if the describe issues are a problem. {quote} In some scenarios, setting all parallelism globally will waste resources or setting low parallelism for some tasks is a good choice. For example, flink job has too many sources, each source has only 5 partitions. So setting parallelism to 5 for each source is enough. Or the business logic is very complex, the flink job has dozens of tasks, and the user sets a reasonable parallelism according to the busy ratio of the tasks (similar to FLIP-AutoScalar). In general, it is a common scenario that the parallelism of multiple tasks is different. For this scenario, it is unreasonable for resource balance that the front TM runs a large number of tasks and the subsequent TMs run a small number of tasks. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711925#comment-17711925 ] Chesnay Schepler commented on FLINK-31757: -- This scenario can only occur if the user explicitly configured a parallelism of 100 for vertex A and 5 for the remaining vertices. The obvious solution for the user is to set the parallelism to 100 for everything if the describe issues are a problem. In the proposed scenarios of 100 TMs with 1 slot each this is literally the only option. Just deciding to use p=100 for all vertices ignores an explicit configuration by the user (bad!), as would mucking around with slot sharing groups. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711897#comment-17711897 ] RocMarshal commented on FLINK-31757: Thanks for your [~Weijie Guo] sorted concise and precise description. There's nothing better.(y) > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711895#comment-17711895 ] RocMarshal commented on FLINK-31757: Thank you [~fanrui] & [~huwh] very much. I'll prepare a design document for discussion. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711892#comment-17711892 ] Weijie Guo commented on FLINK-31757: [~RocMarshal] I have updated the description of this ticket based on my understanding.If it is not what you want to express, feel free to modify it.:) > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711886#comment-17711886 ] Rui Fan commented on FLINK-31757: - Thanks [~RocMarshal] 's reporting and [~huwh] 's feedback. {quote}If the user allocates resources to TM: All TM resources are applied according to the 5 TMs (loading 21-subtasks), then subsequent TM resources will be wasted. If apply the resources based on other TM(only loading a subtask), the 5 TMs resources are insufficient, tasks running on its may have lag. {quote} >From the information, flink users have 2 options: # Set different slot sharing group for tasks. # Set the TM resources according to the high load TM to ensure the performance. Option 1 is not friendly to flink users, and flink sql doesn't support set slot sharing group. Option 2 will waste some TM resources. As I understand, the balance of the number of tasks on the TM can make the actual resource usage of all TMs closer, it should be valuable for flink users and flink community from my side. Please go ahead and prepare a detailed design doc first, thanks.:) > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > > Supposed we have a Job with 21 {{JobVertex}}. The parallelism of vertex A is > 100, and the others are 5. If each {{TaskManager}} only have one slot, then > we need 100 TMs. > There will be 5 slots with 21 sub-tasks, and the others will only have one > sub-task of A. Does this mean we have to make a trade-off between wasted > resources and insufficient resources? > From a resource utilization point of view, we expect all subtasks to be > evenly distributed on each TM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711880#comment-17711880 ] Weihua Hu commented on FLINK-31757: --- [~RocMarshal] sure, I would like to. Will you prepare a design document? > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711875#comment-17711875 ] RocMarshal commented on FLINK-31757: Based on this problem, we has achieved balanced task distribution on TaskManager. I would very much like to be able to contribute it. Would you [~huwh] like to contribute it together ? > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711848#comment-17711848 ] Weihua Hu commented on FLINK-31757: --- Hi [~RocMarshal] , thanks for reporting this. For now, you could set different slot sharing group for tasks to prevent too may tasks in same slots But, we, in bytedance, also face this unbalanced issue. And we did some work on our internal custom flink. We did two things to spread the tasks evenly. # enrich LocalInputPreferredSlotSharingStrategySplit to assign tasks to SlotSharingGroup evenly. I think that could solve the problem you described. In this case, 200 tasks will be assigned to 100 slots, and each slot should have 2 tasks. # Schedule tasks (slots) to TaskManager evenly. This is for slots with different task numbers, such as we have 10 source and 20 sink operators, so we need 20 slots, and there are 10 slots with 2 sub-tasks and 10 slots with 1 sub-task. If we have 10 task managers with 20 slots in total, we should schedule a slot with 2 sub-tasks, and a slot with 1 sub-task, for one task manager. Do you think this would be valuable to the flink community? I would be glad to contribute it. [~Weijie Guo] [~martijnvisser] > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711776#comment-17711776 ] RocMarshal commented on FLINK-31757: h1. Problem description and impact Supposed a Job has 21 tasks: * Task A has the parallelism of 100, * The every remained task has the parallelism of 5. Each TM slot = 1, so the tasks in the job need to apply for 100 TMs. h2. Problem Description Assuming that the TM number is 0-99, from the perspective of Task, the actual result after scheduling is: After the job deployed. There are 5 TMs loading with 21 sub-tasks, while other TMs only load a sub-task. h2. Influence If the user allocates resources to TM: All TM resources are applied according to the 5 TMs (loading 21-subtasks), then subsequent TM resources will be wasted. If apply the resources based on other TM(only loading a subtask), the 5 TMs resources are insufficient, tasks running on its may have lag. >From the perspective of resource usage, we expect all subtasks to be evenly >distributed on each TM. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710022#comment-17710022 ] RocMarshal commented on FLINK-31757: [~Weijie Guo] Glad to get your attention and reminding~:) I'll add the background and cases description later. Looking forward your discussion after that~ Thanks a lot. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31757) Optimize Flink un-balanced tasks scheduling
[ https://issues.apache.org/jira/browse/FLINK-31757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17710021#comment-17710021 ] Weijie Guo commented on FLINK-31757: Hi [~RocMarshal], Can you add more details to this ticket? I don't really know what unbalanced scheduling is. > Optimize Flink un-balanced tasks scheduling > --- > > Key: FLINK-31757 > URL: https://issues.apache.org/jira/browse/FLINK-31757 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)