[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-12 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647978#comment-16647978
 ] 

Thomas Weise commented on BEAM-5713:


It isn't consistent / deterministic. Subsequent runs of same application on 
empty cluster yield different scheduling results. For parallelism of 4, I have 
seen slots distributed over 1, 2 or 3 task managers in different runs, for 
example.

If the cluster is used for a single pipeline only, then it is possible to 
change slots per task manager to get a better distribution. But overall it 
would be nice if Flink provided the user an option to specify a preference for 
the job.

 

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png, 
> image-2018-10-11-11-43-50-333.png, image-2018-10-11-16-20-45-221.png
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-12 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647630#comment-16647630
 ] 

Maximilian Michels commented on BEAM-5713:
--

{quote}
I added more tasks and they are all squeezed into the same slots (only 8 out of 
144 task slots are used).
{quote}

Tasks which directly depend on each other share the same task slot. That is how 
pipelining in Flink works. AFAIK pipelines can get arbitrarily long.


{quote}
The scheduling of all tasks to the same slot is consistent, distribution over 
hosts isn't. With parallelism 4, different result (multiple hosts).
{quote}

Is that consistent behavior for parallelism of 4? I find that it depends on 
what the iterator returns from the task slot HashMap. This depends on a number 
of factors, e.g. what jobs you ran before, how TaskManager registered.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png, 
> image-2018-10-11-11-43-50-333.png, image-2018-10-11-16-20-45-221.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647169#comment-16647169
 ] 

Thomas Weise commented on BEAM-5713:


With parallelism 4, *almost* same

!image-2018-10-11-16-20-45-221.png|width=100%!

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png, 
> image-2018-10-11-11-43-50-333.png, image-2018-10-11-16-20-45-221.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646887#comment-16646887
 ] 

Thomas Weise commented on BEAM-5713:


I added more tasks and they are all squeezed into the same slots (only 8 out of 
144 task slots are used).

!image-2018-10-11-11-43-50-333.png!

 

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png, 
> image-2018-10-11-11-43-50-333.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646847#comment-16646847
 ] 

Thomas Weise commented on BEAM-5713:


Just to confirm my observation: We have 16 task slots per TM. The pipeline has 
5 operators and parallelism 8. What I see it that only 8 task slots on a single 
task manager are used.

 

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646815#comment-16646815
 ] 

Maximilian Michels commented on BEAM-5713:
--

After testing this with native Flink jobs, looking inside the Scheduler code, 
and checking out FLINK-1003, it is clear there is no round-robin task 
scheduling logic for distributing tasks across TaskManagers. The location of 
task slots is transparent for normal operators. If you have more task slots per 
TaskManagers than tasks, then very likely all tasks will end up on the same 
TaskManager.

If the cluster is sized to the Job, this shouldn't impact performance. Also, if 
all task slots are filled by multiple jobs, this should be fine. It is only 
problematic if the cluster is not fully utilized. Then, spreading the load 
across nodes should lead to a better performance.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646497#comment-16646497
 ] 

Maximilian Michels commented on BEAM-5713:
--

Putting the native source into a different {{SlotSharingGroup}} yields the 
following:

 !Different SlotSharingGroup.png! 

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: Different SlotSharingGroup.png, With 
> RichParallelSourceFunction and parallelism 5.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646493#comment-16646493
 ] 

Thomas Weise commented on BEAM-5713:


That's something we should fix for the standard Impulse translation.

However, my test case is using a native transform that is already translated 
into RichParallelSourceFunction. I tried to turn off operator chaining. Without 
it, there are two tasks, but still all subtasks are scheduled on the same TM.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: With RichParallelSourceFunction and parallelism 5.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646479#comment-16646479
 ] 

Maximilian Michels commented on BEAM-5713:
--

Seems like this was largely by chance because your example doesn't use Impulse 
transforms. However, I've opened a PR which fixes sources written via portable 
DoFns.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: With RichParallelSourceFunction and parallelism 5.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646345#comment-16646345
 ] 

Maximilian Michels commented on BEAM-5713:
--

 !With RichParallelSourceFunction and parallelism 5.png! 

The load is not distributed completely equal across two TaskManagers, but it is 
an improvement.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: With RichParallelSourceFunction and parallelism 5.png
>
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646343#comment-16646343
 ] 

Maximilian Michels commented on BEAM-5713:
--

Actually, this is a FlinkRunner bug. The Impulse source is a regular 
{{SourceFunction}}, not a {{ParallelSourceFunction}}. So the scheduler always 
prefers to schedule on the node where the source gets deployed (to avoid 
network traffic). After I change it to {{ParallelSourceFunction}}, the 
scheduling is as expected.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-11 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646325#comment-16646325
 ] 

Maximilian Michels commented on BEAM-5713:
--

This is a "feature". See 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Distributing-Tasks-over-Task-manager-td9481.html
 and FLINK-1003.

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5713) Flink portable runner schedules all tasks of streaming job on same task manager

2018-10-10 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645821#comment-16645821
 ] 

Thomas Weise commented on BEAM-5713:


Can be reproduced with branch: 
https://github.com/lyft/beam/tree/micah_process_streaming_leak

Simplified single task pipeline: 
https://gist.github.com/tweise/09ec82446f74bb534d488209ad88e75f

> Flink portable runner schedules all tasks of streaming job on same task 
> manager
> ---
>
> Key: BEAM-5713
> URL: https://issues.apache.org/jira/browse/BEAM-5713
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Thomas Weise
>Priority: Major
>  Labels: portability, portability-flink
>
> The cluster has 9 task managers and 144 task slots total. A simple streaming 
> pipeline with parallelism of 8 will get all tasks scheduled on the same task 
> manager, causing the host to be fully booked and the remaining cluster idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)