[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647978#comment-16647978 ] Thomas Weise commented on BEAM-5713: It isn't consistent / deterministic. Subsequent runs of same application on empty cluster yield different scheduling results. For parallelism of 4, I have seen slots distributed over 1, 2 or 3 task managers in different runs, for example. If the cluster is used for a single pipeline only, then it is possible to change slots per task manager to get a better distribution. But overall it would be nice if Flink provided the user an option to specify a preference for the job. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png, > image-2018-10-11-11-43-50-333.png, image-2018-10-11-16-20-45-221.png > > Time Spent: 40m > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647630#comment-16647630 ] Maximilian Michels commented on BEAM-5713: -- {quote} I added more tasks and they are all squeezed into the same slots (only 8 out of 144 task slots are used). {quote} Tasks which directly depend on each other share the same task slot. That is how pipelining in Flink works. AFAIK pipelines can get arbitrarily long. {quote} The scheduling of all tasks to the same slot is consistent, distribution over hosts isn't. With parallelism 4, different result (multiple hosts). {quote} Is that consistent behavior for parallelism of 4? I find that it depends on what the iterator returns from the task slot HashMap. This depends on a number of factors, e.g. what jobs you ran before, how TaskManager registered. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png, > image-2018-10-11-11-43-50-333.png, image-2018-10-11-16-20-45-221.png > > Time Spent: 0.5h > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647169#comment-16647169 ] Thomas Weise commented on BEAM-5713: With parallelism 4, *almost* same !image-2018-10-11-16-20-45-221.png|width=100%! > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png, > image-2018-10-11-11-43-50-333.png, image-2018-10-11-16-20-45-221.png > > Time Spent: 0.5h > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646887#comment-16646887 ] Thomas Weise commented on BEAM-5713: I added more tasks and they are all squeezed into the same slots (only 8 out of 144 task slots are used). !image-2018-10-11-11-43-50-333.png! > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png, > image-2018-10-11-11-43-50-333.png > > Time Spent: 0.5h > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646847#comment-16646847 ] Thomas Weise commented on BEAM-5713: Just to confirm my observation: We have 16 task slots per TM. The pipeline has 5 operators and parallelism 8. What I see it that only 8 task slots on a single task manager are used. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png > > Time Spent: 0.5h > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646815#comment-16646815 ] Maximilian Michels commented on BEAM-5713: -- After testing this with native Flink jobs, looking inside the Scheduler code, and checking out FLINK-1003, it is clear there is no round-robin task scheduling logic for distributing tasks across TaskManagers. The location of task slots is transparent for normal operators. If you have more task slots per TaskManagers than tasks, then very likely all tasks will end up on the same TaskManager. If the cluster is sized to the Job, this shouldn't impact performance. Also, if all task slots are filled by multiple jobs, this should be fine. It is only problematic if the cluster is not fully utilized. Then, spreading the load across nodes should lead to a better performance. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png > > Time Spent: 0.5h > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646497#comment-16646497 ] Maximilian Michels commented on BEAM-5713: -- Putting the native source into a different {{SlotSharingGroup}} yields the following: !Different SlotSharingGroup.png! > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: Different SlotSharingGroup.png, With > RichParallelSourceFunction and parallelism 5.png > > Time Spent: 20m > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646493#comment-16646493 ] Thomas Weise commented on BEAM-5713: That's something we should fix for the standard Impulse translation. However, my test case is using a native transform that is already translated into RichParallelSourceFunction. I tried to turn off operator chaining. Without it, there are two tasks, but still all subtasks are scheduled on the same TM. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: With RichParallelSourceFunction and parallelism 5.png > > Time Spent: 20m > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646479#comment-16646479 ] Maximilian Michels commented on BEAM-5713: -- Seems like this was largely by chance because your example doesn't use Impulse transforms. However, I've opened a PR which fixes sources written via portable DoFns. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: With RichParallelSourceFunction and parallelism 5.png > > Time Spent: 20m > Remaining Estimate: 0h > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646345#comment-16646345 ] Maximilian Michels commented on BEAM-5713: -- !With RichParallelSourceFunction and parallelism 5.png! The load is not distributed completely equal across two TaskManagers, but it is an improvement. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: With RichParallelSourceFunction and parallelism 5.png > > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646343#comment-16646343 ] Maximilian Michels commented on BEAM-5713: -- Actually, this is a FlinkRunner bug. The Impulse source is a regular {{SourceFunction}}, not a {{ParallelSourceFunction}}. So the scheduler always prefers to schedule on the node where the source gets deployed (to avoid network traffic). After I change it to {{ParallelSourceFunction}}, the scheduling is as expected. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646325#comment-16646325 ] Maximilian Michels commented on BEAM-5713: -- This is a "feature". See http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Distributing-Tasks-over-Task-manager-td9481.html and FLINK-1003. > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager
[ https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645821#comment-16645821 ] Thomas Weise commented on BEAM-5713: Can be reproduced with branch: https://github.com/lyft/beam/tree/micah_process_streaming_leak Simplified single task pipeline: https://gist.github.com/tweise/09ec82446f74bb534d488209ad88e75f > Flink portable runner schedules all tasks of streaming job on same task > manager > --- > > Key: BEAM-5713 > URL: https://issues.apache.org/jira/browse/BEAM-5713 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Thomas Weise >Priority: Major > Labels: portability, portability-flink > > The cluster has 9 task managers and 144 task slots total. A simple streaming > pipeline with parallelism of 8 will get all tasks scheduled on the same task > manager, causing the host to be fully booked and the remaining cluster idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)