Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-24 Thread Kenneth Knowles
FWIW I think parallelism is close enough to a resource. If you phrased it
like "how many CPUs can work independently" it is more closely related to
resources. Just like how many bits it takes to encode something is a
semantic property, but "RAM" is a resource.

I think a big role of resource hints is to be a bridge between the Beam
Model, which tries hard to only include essential information, to a
particular implementation which may not be able to autotune various
inessential/implementation details. Specifying parallelism to a runner that
still requires manual tuning of that seems like a fine use of this.

Kenn

On Fri, Apr 21, 2023 at 11:30 AM Jan Lukavský  wrote:

> Absolutely agree this is not something that should be part of the model.
> The ResourceHints is good place, but given how Pipeline might get fused
> (and though this might be under the control of a runner, basically all
> runners use the same code, because there is currently no reason why this
> should be runner-specifiic), there is a problem with how to resolve
> conflicting settings. Also it is somewhat questionable if parallelism is a
> "resource". It feels more like a runtime property. I tend to think that
> FlinkPipelineOptions could be a good place for that, because this seems to
> apply (mostly) to Flink batch runner.
> On 4/21/23 19:43, Robert Bradshaw via dev wrote:
>
> +1 to not requiring details like this in the Beam model. There is,
> however, the question of how to pass such implementation-detail specific
> hints to a runner that requires them. Generally that's done via
> ResourceHints or annotations, and while the former seems a good fit it's
> primarily focused on setting up the right context for user code (which GBK
> is not).
>
> A complete hack is to add an experiment like
> flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
> cleaner.
>
>
> On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user 
> wrote:
>
>> Hi Jan,
>>
>> To generalize the per-stage parallelism configuration, we should have a
>> FR proposing the capability to explicitly set autoscaling (in this case,
>> fixed size per stage) policy in Beam pipelines.
>>
>> Per-step or per-stage parallelism, or fusion/optimization is not part of
>> the Beam model. They are [Flink] runner implementation details and should
>> be configured for each runner.
>>
>> Also, when building the pipeline, it's not clear what the fusion looks
>> like until the pipeline is submitted to a runner, thus making configuration
>> of the parallelism/worker-per-stage not straightforward.
>> Flink's parallelism settings can be found here
>> ,
>> it's still kind of a black box since you don't really know how many tasks
>> are actually spawned until you run a pipeline.
>>
>> That being said, if we have a general interface controlling how a
>> pipeline scales, each runner could adapt [auto]scaling in their own way.
>> For example, in a Flink job, each operator/stage's task slot is prorated
>> by their key numbers; the maximum parallelism is throttled by task slot
>> utilization.
>> Another example, in a Dataflow job, each stage horizontally scales by CPU
>> utilization; vertically scales by memory/disk utilization.
>>
>> +dev@beam.apache.org 
>> Let's use this thread to discuss how to configure a pipeline for runners
>> so that they can scale workers appropriately without exposing
>> runner-specific details to the Beam model.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:
>>
>>> Hi Ning,
>>>
>>> I might have missed that in the discussion, but we talk about batch
>>> execution, am I right? In streaming, all operators (PTransforms) of a
>>> Pipeline are run in the same slots, thus the downsides are limited. You can
>>> enforce streaming mode using --streaming command-line argument. But yes,
>>> this might have other implications. For batch only it obviously makes sense
>>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>>> concept, but rather a more complex union of transforms divided by shuffle
>>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>>> list for this for deeper discussion?
>>>
>>>  Jan
>>> On 4/20/23 19:18, Ning Kang via user wrote:
>>>
>>> Hi Jan,
>>>
>>> The approach works when your pipeline doesn't have too many operators.
>>> And the operator that needs the highest parallelism can only use at most
>>> #total_task_slots / #operators resources available in the cluster.
>>>
>>> Another downside is wasted resources for other smaller operators who
>>> cannot make full use of task slots assigned to them. You might see only
>>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>>
>>> One redeeming method is that, for operators following another operator
>>> with high fanout, we can explicitly add a R

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-21 Thread Jan Lukavský
Absolutely agree this is not something that should be part of the model. 
The ResourceHints is good place, but given how Pipeline might get fused 
(and though this might be under the control of a runner, basically all 
runners use the same code, because there is currently no reason why this 
should be runner-specifiic), there is a problem with how to resolve 
conflicting settings. Also it is somewhat questionable if parallelism is 
a "resource". It feels more like a runtime property. I tend to think 
that FlinkPipelineOptions could be a good place for that, because this 
seems to apply (mostly) to Flink batch runner.


On 4/21/23 19:43, Robert Bradshaw via dev wrote:
+1 to not requiring details like this in the Beam model. There is, 
however, the question of how to pass such implementation-detail 
specific hints to a runner that requires them. Generally that's done 
via ResourceHints or annotations, and while the former seems a good 
fit it's primarily focused on setting up the right context for user 
code (which GBK is not).


A complete hack is to add an experiment like 
flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do 
something cleaner.



On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user 
 wrote:


Hi Jan,

To generalize the per-stage parallelism configuration, we should
have a FR proposing the capability to explicitly set autoscaling
(in this case, fixed size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not
part of the Beam model. They are [Flink] runner implementation
details and should be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion
looks like until the pipeline is submitted to a runner, thus
making configuration of the parallelism/worker-per-stage not
straightforward.
Flink's parallelism settings can be found here

,
it's still kind of a black box since you don't really know how
many tasks are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a
pipeline scales, each runner could adapt [auto]scaling in their
own way.
For example, in a Flink job, each operator/stage's task slot is
prorated by their key numbers; the maximum parallelism is
throttled by task slot utilization.
Another example, in a Dataflow job, each stage horizontally scales
by CPU utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org 
Let's use this thread to discuss how to configure a pipeline for
runners so that they can scale workers appropriately without
exposing runner-specific details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:

Hi Ning,

I might have missed that in the discussion, but we talk about
batch execution, am I right? In streaming, all operators
(PTransforms) of a Pipeline are run in the same slots, thus
the downsides are limited. You can enforce streaming mode
using --streaming command-line argument. But yes, this might
have other implications. For batch only it obviously makes
sense to limit parallelism of a (fused) 'stage', which is not
an transform-level concept, but rather a more complex union of
transforms divided by shuffle barrier. Would you be willing to
start a follow-up thread in @dev mailing list for this for
deeper discussion?

 Jan

On 4/20/23 19:18, Ning Kang via user wrote:

Hi Jan,

The approach works when your pipeline doesn't have too many
operators. And the operator that needs the highest
parallelism can only use at most #total_task_slots /
#operators resources available in the cluster.

Another downside is wasted resources for other smaller
operators who cannot make full use of task slots assigned to
them. You might see only 1/10 tasks running while the other
9/10 tasks idle for an operator with parallelism 10,
especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another
operator with high fanout, we can explicitly add a Reshuffle
to allow a higher parallelism. But this circles back to the
first downside: if your pipeline has exponentially high
fanout through it, setting a single parallelism for the whole
pipeline is not ideal because it limits the scalability of
your pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský
 wrote:

Hi,

this topic was discussed many years ago and the
conclusion there was that setting the parallelism of
  

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-21 Thread Robert Bradshaw via dev
+1 to not requiring details like this in the Beam model. There is, however,
the question of how to pass such implementation-detail specific hints to a
runner that requires them. Generally that's done via ResourceHints or
annotations, and while the former seems a good fit it's primarily focused
on setting up the right context for user code (which GBK is not).

A complete hack is to add an experiment like
flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
cleaner.


On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user 
wrote:

> Hi Jan,
>
> To generalize the per-stage parallelism configuration, we should have a FR
> proposing the capability to explicitly set autoscaling (in this case, fixed
> size per stage) policy in Beam pipelines.
>
> Per-step or per-stage parallelism, or fusion/optimization is not part of
> the Beam model. They are [Flink] runner implementation details and should
> be configured for each runner.
>
> Also, when building the pipeline, it's not clear what the fusion looks
> like until the pipeline is submitted to a runner, thus making configuration
> of the parallelism/worker-per-stage not straightforward.
> Flink's parallelism settings can be found here
> ,
> it's still kind of a black box since you don't really know how many tasks
> are actually spawned until you run a pipeline.
>
> That being said, if we have a general interface controlling how a pipeline
> scales, each runner could adapt [auto]scaling in their own way.
> For example, in a Flink job, each operator/stage's task slot is prorated
> by their key numbers; the maximum parallelism is throttled by task slot
> utilization.
> Another example, in a Dataflow job, each stage horizontally scales by CPU
> utilization; vertically scales by memory/disk utilization.
>
> +dev@beam.apache.org 
> Let's use this thread to discuss how to configure a pipeline for runners
> so that they can scale workers appropriately without exposing
> runner-specific details to the Beam model.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:
>
>> Hi Ning,
>>
>> I might have missed that in the discussion, but we talk about batch
>> execution, am I right? In streaming, all operators (PTransforms) of a
>> Pipeline are run in the same slots, thus the downsides are limited. You can
>> enforce streaming mode using --streaming command-line argument. But yes,
>> this might have other implications. For batch only it obviously makes sense
>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>> concept, but rather a more complex union of transforms divided by shuffle
>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>> list for this for deeper discussion?
>>
>>  Jan
>> On 4/20/23 19:18, Ning Kang via user wrote:
>>
>> Hi Jan,
>>
>> The approach works when your pipeline doesn't have too many operators.
>> And the operator that needs the highest parallelism can only use at most
>> #total_task_slots / #operators resources available in the cluster.
>>
>> Another downside is wasted resources for other smaller operators who
>> cannot make full use of task slots assigned to them. You might see only
>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>
>> One redeeming method is that, for operators following another operator
>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>> parallelism. But this circles back to the first downside: if your pipeline
>> has exponentially high fanout through it, setting a single parallelism for
>> the whole pipeline is not ideal because it limits the scalability of your
>> pipeline significantly.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> this topic was discussed many years ago and the conclusion there was
>>> that setting the parallelism of individual operators via
>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>> high parallelism for operators with small inputs, does this actually bring
>>> any relevant performance impact? I always use parallelism based on the
>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>> any particular need or measurable impact of such approach?
>>>
>>>  Jan
>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>
>>> Same need here, using Flink runner. We are processing a pcollection
>>> (extracting features per element) then combining these into groups of
>>> features and running the next operator on those groups.
>>>
>>> Each group contains ~50 elements, so the parallelism of the operator
>>> upstream of the groupby should be higher, to be balanced with the
>>> downstream operator.
>>>
>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-21 Thread Ning Kang via dev
Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+dev@beam.apache.org 
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:
>>>
 Jeff - does setting the global default work for you, or do you need
 per-operator control? Seems like it would be to add this to ResourceHints.

 On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
 wrote:

> Yeah, I don't think we have a good per-operator API for this. If we
> were to add it, it probably belongs in ResourceHints.
>>