Re: Portable runner bundle scheduling (Streaming/Python/Flink)

2019-12-05 Thread Thomas Weise
PR for this is now open: https://github.com/apache/beam/pull/10313

Hey Max,

Thanks for the feedback.

-->

On Sun, Nov 24, 2019 at 2:04 PM Maximilian Michels  wrote:

> Load-balancing the worker selection for bundle execution sounds like the
> solution to uneven work distribution across the workers. Some comments:
>
> (1) I could imagine that in case of long-running bundle execution (e.g.
> model execution), this could stall upstream operators because their busy
> downstream operators hold all available workers, thus also letting the
> pipeline throughput/latency suffer.
>

When there is a bottleneck in a downstream operator, the upstream operators
will eventually back up due to backpressure, regardless if workers are
available or not. Results only become available at the processing rate of
the slowest operator/stage.

The worker resources on a given node are limited. For the pipeline to
function, all operators need to make progress. There are typically more
subtasks/threads than there are worker processes, hence workers are shared.
The observation made was that with bundles of an executable stage always
pinned to the same worker, workers are not utilized properly.

The proposed change therefore is to (optionally) augment the distribution
of bundles over workers so that when any worker is available, progress can
be made.

With any executable stage able to execute on any available worker, we see
improved utilization.


>
> Instead of balancing across _all_ the workers available on particular
> node (aka TaskManager), it could make sense to just increase the share
> of SDK workers for a particular executable stage. At the moment, each
> stage just receives a single worker. Instead, it could receive a higher
> share of workers, which could either be exclusive or overlap with a
> share of another executable stage. Essentially, this is an extension to
> what you are proposing to ensure stages make progress.
>

If there is headroom, more workers can be allocated and they are going to
be used for any work available. The implementation as it stands ensures
fairness (first come first serve). I doubt a worker partitioning as you
suggest would improve the situation. It would essentially be a vertical
partitioning of resources, but we need all stages to make progress to
compute a result.


> (2) Another concern is that load balancing across multiple worker
> instances would render state caching useless. We need to make the Runner
> aware of it such that it can turn off state caching. With the approach
> of multiple workers per stage in (1), it would also be possible to keep
> the state caching, if we divided the key range across the workers.
>

State caching and load balancing are mutually exclusive and I added a check
to enforce that. For the use case that I'm looking at, the cost of state
access compared to that of running the expensive / high latency operations
is tiny to none.


>
> Cheers,
> Max
>
> On 23.11.19 18:42, Thomas Weise wrote:
> > JIRA: https://issues.apache.org/jira/browse/BEAM-8816
> >
> >
> > On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise  > > wrote:
> >
> > Hi Luke,
> >
> > Thanks for the background and it is exciting to see the progress on
> > the SDF side. It will help with this use case and many other
> > challenges. I imagine the Python user code would be able to
> > determine that it is bogged down with high latency record processing
> > (based on the duration it actually took to process previous records)
> > and opt to send back remaining work to the runner.
> >
> > Until the Flink runner supports reassignment of work, I'm planning
> > to implement the simple bundle distribution approach referred to
> > before. We will test it in our environment and contribute it back if
> > the results are good.
> >
> > Thomas
> >
> >
> >
> > On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik  > > wrote:
> >
> > Dataflow has run into this issue as well. Dataflow has "work
> > items" that are converted into bundles that are executed on the
> > SDK. Each work item does a greedy assignment to the SDK worker
> > with the fewest work items assigned. As you surmised, we use SDF
> > splitting in batch pipelines to balance work. We would like to
> > use splitting of SDFs in streaming pipelines as well but
> > Dataflow can't handle it as of right now.
> >
> > As part of a few PRs, I have added basic SDF expansion to the
> > shared runner lib and slowly exposed the runner side hooks[2, 3]
> > for SDK initiated checkpointing and bundle finalization. There
> > are still a few pieces left:
> > * exposing an API so the bundle can be split during execution
> > * adding the limited depth splitting logic that would add a
> > basic form of dynamic work rebalancing for all runners that
> > decide to use it
> >
> 

Re: Portable runner bundle scheduling (Streaming/Python/Flink)

2019-11-24 Thread Maximilian Michels
Load-balancing the worker selection for bundle execution sounds like the 
solution to uneven work distribution across the workers. Some comments:


(1) I could imagine that in case of long-running bundle execution (e.g. 
model execution), this could stall upstream operators because their busy 
downstream operators hold all available workers, thus also letting the 
pipeline throughput/latency suffer.


Instead of balancing across _all_ the workers available on particular 
node (aka TaskManager), it could make sense to just increase the share 
of SDK workers for a particular executable stage. At the moment, each 
stage just receives a single worker. Instead, it could receive a higher 
share of workers, which could either be exclusive or overlap with a 
share of another executable stage. Essentially, this is an extension to 
what you are proposing to ensure stages make progress.


(2) Another concern is that load balancing across multiple worker 
instances would render state caching useless. We need to make the Runner 
aware of it such that it can turn off state caching. With the approach 
of multiple workers per stage in (1), it would also be possible to keep 
the state caching, if we divided the key range across the workers.


Cheers,
Max

On 23.11.19 18:42, Thomas Weise wrote:

JIRA: https://issues.apache.org/jira/browse/BEAM-8816


On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise > wrote:


Hi Luke,

Thanks for the background and it is exciting to see the progress on
the SDF side. It will help with this use case and many other
challenges. I imagine the Python user code would be able to
determine that it is bogged down with high latency record processing
(based on the duration it actually took to process previous records)
and opt to send back remaining work to the runner.

Until the Flink runner supports reassignment of work, I'm planning
to implement the simple bundle distribution approach referred to
before. We will test it in our environment and contribute it back if
the results are good.

Thomas



On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik mailto:lc...@google.com>> wrote:

Dataflow has run into this issue as well. Dataflow has "work
items" that are converted into bundles that are executed on the
SDK. Each work item does a greedy assignment to the SDK worker
with the fewest work items assigned. As you surmised, we use SDF
splitting in batch pipelines to balance work. We would like to
use splitting of SDFs in streaming pipelines as well but
Dataflow can't handle it as of right now.

As part of a few PRs, I have added basic SDF expansion to the
shared runner lib and slowly exposed the runner side hooks[2, 3]
for SDK initiated checkpointing and bundle finalization. There
are still a few pieces left:
* exposing an API so the bundle can be split during execution
* adding the limited depth splitting logic that would add a
basic form of dynamic work rebalancing for all runners that
decide to use it

1: https://github.com/apache/beam/pull/10045
2: https://github.com/apache/beam/pull/10065
3: https://github.com/apache/beam/pull/10074

On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise mailto:t...@apache.org>> wrote:

We found a problem with uneven utilization of SDK workers
causing excessive latency with Streaming/Python/Flink.
Remember that with Python, we need to execute multiple
worker processes on a machine instead of relying on threads
in a single worker, which requires the runner to make a
decision to which worker to give a bundle for processing.

The Flink runner has knobs to influence the number of
records per bundle and the maximum duration for a bundle.
But since the runner does not understand the cost of an
individual record, it is possible that the duration of
bundles fluctuates significantly due to the skew in
processing time of individual records. And unless the bundle
size is 1, multiple expensive records could be allocated to
a single bundle before the cutoff time is reached. We notice
this with a pipeline that executes models, but there are
other use cases where the cost of individual records can
vary significantly.

Additionally, the Flink runner establishes the association
between the subtask managing an executable stage and the SDK
worker during initialization, lasting for the duration of
the job. In other words, bundles for the same executable
stage will always be sent to the same SDK worker. When the
execution time skew is tied to specific keys (stateful
processing), it further aggravates 

Re: Portable runner bundle scheduling (Streaming/Python/Flink)

2019-11-23 Thread Thomas Weise
JIRA: https://issues.apache.org/jira/browse/BEAM-8816


On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise  wrote:

> Hi Luke,
>
> Thanks for the background and it is exciting to see the progress on the
> SDF side. It will help with this use case and many other challenges. I
> imagine the Python user code would be able to determine that it is bogged
> down with high latency record processing (based on the duration it actually
> took to process previous records) and opt to send back remaining work to
> the runner.
>
> Until the Flink runner supports reassignment of work, I'm planning to
> implement the simple bundle distribution approach referred to before. We
> will test it in our environment and contribute it back if the results are
> good.
>
> Thomas
>
>
>
> On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik  wrote:
>
>> Dataflow has run into this issue as well. Dataflow has "work items" that
>> are converted into bundles that are executed on the SDK. Each work item
>> does a greedy assignment to the SDK worker with the fewest work items
>> assigned. As you surmised, we use SDF splitting in batch pipelines to
>> balance work. We would like to use splitting of SDFs in streaming pipelines
>> as well but Dataflow can't handle it as of right now.
>>
>> As part of a few PRs, I have added basic SDF expansion to the shared
>> runner lib and slowly exposed the runner side hooks[2, 3] for SDK initiated
>> checkpointing and bundle finalization. There are still a few pieces left:
>> * exposing an API so the bundle can be split during execution
>> * adding the limited depth splitting logic that would add a basic form of
>> dynamic work rebalancing for all runners that decide to use it
>>
>> 1: https://github.com/apache/beam/pull/10045
>> 2: https://github.com/apache/beam/pull/10065
>> 3: https://github.com/apache/beam/pull/10074
>>
>> On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise  wrote:
>>
>>> We found a problem with uneven utilization of SDK workers causing
>>> excessive latency with Streaming/Python/Flink. Remember that with Python,
>>> we need to execute multiple worker processes on a machine instead of
>>> relying on threads in a single worker, which requires the runner to make a
>>> decision to which worker to give a bundle for processing.
>>>
>>> The Flink runner has knobs to influence the number of records per bundle
>>> and the maximum duration for a bundle. But since the runner does not
>>> understand the cost of an individual record, it is possible that the
>>> duration of bundles fluctuates significantly due to the skew in processing
>>> time of individual records. And unless the bundle size is 1, multiple
>>> expensive records could be allocated to a single bundle before the cutoff
>>> time is reached. We notice this with a pipeline that executes models, but
>>> there are other use cases where the cost of individual records can vary
>>> significantly.
>>>
>>> Additionally, the Flink runner establishes the association between the
>>> subtask managing an executable stage and the SDK worker during
>>> initialization, lasting for the duration of the job. In other words,
>>> bundles for the same executable stage will always be sent to the same SDK
>>> worker. When the execution time skew is tied to specific keys (stateful
>>> processing), it further aggravates the issue.
>>>
>>> I started experimenting with the ability to schedule bundles on any
>>> available worker. Initially I'm trying a very basic approach, starting
>>> processing of a bundle only on a free environment (one that does not
>>> process any other bundle). This effectively removes the pipelining between
>>> subtask and SDK worker. Potentially waiting for an available environment is
>>> acceptable in this case, as the per bundle overhead is very small compared
>>> to the per record cost.
>>>
>>> However, even if this suffices for the use case I'm looking at, this is
>>> an area that will probably need more work going forward. Rather than the
>>> runner guessing how to schedule bundles, I think that the best long term
>>> solution would be SDF, where the user code can decide that something takes
>>> too long and defer remaining work (and the runner can redistribute it).
>>>
>>> Curious if anyone else has run into this issue yet and what other ideas
>>> there may be?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>


Re: Portable runner bundle scheduling (Streaming/Python/Flink)

2019-11-21 Thread Thomas Weise
Hi Luke,

Thanks for the background and it is exciting to see the progress on the SDF
side. It will help with this use case and many other challenges. I
imagine the Python user code would be able to determine that it is bogged
down with high latency record processing (based on the duration it actually
took to process previous records) and opt to send back remaining work to
the runner.

Until the Flink runner supports reassignment of work, I'm planning to
implement the simple bundle distribution approach referred to before. We
will test it in our environment and contribute it back if the results are
good.

Thomas



On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik  wrote:

> Dataflow has run into this issue as well. Dataflow has "work items" that
> are converted into bundles that are executed on the SDK. Each work item
> does a greedy assignment to the SDK worker with the fewest work items
> assigned. As you surmised, we use SDF splitting in batch pipelines to
> balance work. We would like to use splitting of SDFs in streaming pipelines
> as well but Dataflow can't handle it as of right now.
>
> As part of a few PRs, I have added basic SDF expansion to the shared
> runner lib and slowly exposed the runner side hooks[2, 3] for SDK initiated
> checkpointing and bundle finalization. There are still a few pieces left:
> * exposing an API so the bundle can be split during execution
> * adding the limited depth splitting logic that would add a basic form of
> dynamic work rebalancing for all runners that decide to use it
>
> 1: https://github.com/apache/beam/pull/10045
> 2: https://github.com/apache/beam/pull/10065
> 3: https://github.com/apache/beam/pull/10074
>
> On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise  wrote:
>
>> We found a problem with uneven utilization of SDK workers causing
>> excessive latency with Streaming/Python/Flink. Remember that with Python,
>> we need to execute multiple worker processes on a machine instead of
>> relying on threads in a single worker, which requires the runner to make a
>> decision to which worker to give a bundle for processing.
>>
>> The Flink runner has knobs to influence the number of records per bundle
>> and the maximum duration for a bundle. But since the runner does not
>> understand the cost of an individual record, it is possible that the
>> duration of bundles fluctuates significantly due to the skew in processing
>> time of individual records. And unless the bundle size is 1, multiple
>> expensive records could be allocated to a single bundle before the cutoff
>> time is reached. We notice this with a pipeline that executes models, but
>> there are other use cases where the cost of individual records can vary
>> significantly.
>>
>> Additionally, the Flink runner establishes the association between the
>> subtask managing an executable stage and the SDK worker during
>> initialization, lasting for the duration of the job. In other words,
>> bundles for the same executable stage will always be sent to the same SDK
>> worker. When the execution time skew is tied to specific keys (stateful
>> processing), it further aggravates the issue.
>>
>> I started experimenting with the ability to schedule bundles on any
>> available worker. Initially I'm trying a very basic approach, starting
>> processing of a bundle only on a free environment (one that does not
>> process any other bundle). This effectively removes the pipelining between
>> subtask and SDK worker. Potentially waiting for an available environment is
>> acceptable in this case, as the per bundle overhead is very small compared
>> to the per record cost.
>>
>> However, even if this suffices for the use case I'm looking at, this is
>> an area that will probably need more work going forward. Rather than the
>> runner guessing how to schedule bundles, I think that the best long term
>> solution would be SDF, where the user code can decide that something takes
>> too long and defer remaining work (and the runner can redistribute it).
>>
>> Curious if anyone else has run into this issue yet and what other ideas
>> there may be?
>>
>> Thanks,
>> Thomas
>>
>>


Re: Portable runner bundle scheduling (Streaming/Python/Flink)

2019-11-20 Thread Luke Cwik
Dataflow has run into this issue as well. Dataflow has "work items" that
are converted into bundles that are executed on the SDK. Each work item
does a greedy assignment to the SDK worker with the fewest work items
assigned. As you surmised, we use SDF splitting in batch pipelines to
balance work. We would like to use splitting of SDFs in streaming pipelines
as well but Dataflow can't handle it as of right now.

As part of a few PRs, I have added basic SDF expansion to the shared runner
lib and slowly exposed the runner side hooks[2, 3] for SDK initiated
checkpointing and bundle finalization. There are still a few pieces left:
* exposing an API so the bundle can be split during execution
* adding the limited depth splitting logic that would add a basic form of
dynamic work rebalancing for all runners that decide to use it

1: https://github.com/apache/beam/pull/10045
2: https://github.com/apache/beam/pull/10065
3: https://github.com/apache/beam/pull/10074

On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise  wrote:

> We found a problem with uneven utilization of SDK workers causing
> excessive latency with Streaming/Python/Flink. Remember that with Python,
> we need to execute multiple worker processes on a machine instead of
> relying on threads in a single worker, which requires the runner to make a
> decision to which worker to give a bundle for processing.
>
> The Flink runner has knobs to influence the number of records per bundle
> and the maximum duration for a bundle. But since the runner does not
> understand the cost of an individual record, it is possible that the
> duration of bundles fluctuates significantly due to the skew in processing
> time of individual records. And unless the bundle size is 1, multiple
> expensive records could be allocated to a single bundle before the cutoff
> time is reached. We notice this with a pipeline that executes models, but
> there are other use cases where the cost of individual records can vary
> significantly.
>
> Additionally, the Flink runner establishes the association between the
> subtask managing an executable stage and the SDK worker during
> initialization, lasting for the duration of the job. In other words,
> bundles for the same executable stage will always be sent to the same SDK
> worker. When the execution time skew is tied to specific keys (stateful
> processing), it further aggravates the issue.
>
> I started experimenting with the ability to schedule bundles on any
> available worker. Initially I'm trying a very basic approach, starting
> processing of a bundle only on a free environment (one that does not
> process any other bundle). This effectively removes the pipelining between
> subtask and SDK worker. Potentially waiting for an available environment is
> acceptable in this case, as the per bundle overhead is very small compared
> to the per record cost.
>
> However, even if this suffices for the use case I'm looking at, this is an
> area that will probably need more work going forward. Rather than the
> runner guessing how to schedule bundles, I think that the best long term
> solution would be SDF, where the user code can decide that something takes
> too long and defer remaining work (and the runner can redistribute it).
>
> Curious if anyone else has run into this issue yet and what other ideas
> there may be?
>
> Thanks,
> Thomas
>
>


Portable runner bundle scheduling (Streaming/Python/Flink)

2019-11-20 Thread Thomas Weise
We found a problem with uneven utilization of SDK workers causing excessive
latency with Streaming/Python/Flink. Remember that with Python, we need to
execute multiple worker processes on a machine instead of relying on
threads in a single worker, which requires the runner to make a decision to
which worker to give a bundle for processing.

The Flink runner has knobs to influence the number of records per bundle
and the maximum duration for a bundle. But since the runner does not
understand the cost of an individual record, it is possible that the
duration of bundles fluctuates significantly due to the skew in processing
time of individual records. And unless the bundle size is 1, multiple
expensive records could be allocated to a single bundle before the cutoff
time is reached. We notice this with a pipeline that executes models, but
there are other use cases where the cost of individual records can vary
significantly.

Additionally, the Flink runner establishes the association between the
subtask managing an executable stage and the SDK worker during
initialization, lasting for the duration of the job. In other words,
bundles for the same executable stage will always be sent to the same SDK
worker. When the execution time skew is tied to specific keys (stateful
processing), it further aggravates the issue.

I started experimenting with the ability to schedule bundles on any
available worker. Initially I'm trying a very basic approach, starting
processing of a bundle only on a free environment (one that does not
process any other bundle). This effectively removes the pipelining between
subtask and SDK worker. Potentially waiting for an available environment is
acceptable in this case, as the per bundle overhead is very small compared
to the per record cost.

However, even if this suffices for the use case I'm looking at, this is an
area that will probably need more work going forward. Rather than the
runner guessing how to schedule bundles, I think that the best long term
solution would be SDF, where the user code can decide that something takes
too long and defer remaining work (and the runner can redistribute it).

Curious if anyone else has run into this issue yet and what other ideas
there may be?

Thanks,
Thomas