Hi Alexis,

> Is it possible to have different values of max parallelism in different 
> operators?
Yes, it is possible, please refer to [1] and [2] for API details.

> I did a test in which my source had a max parallelism of 3, whereas a 
> downstream operator had a (non-max) parallelism explicitly set to 4, and the 
> job could not be started. Could this related to operator chaining? Or maybe 
> the whole job ended up with a max parallelism of 3 because I didn't set it 
> and it took the value from the source?
Could you share the error details? I guess the downstream operator
inherited its upstream max parallelism; so its parallelism of
exceeded max of 3.

> Additionally, the documentation states that, in reactive mode, only max 
> parallelism is taken into account, so if I want to limit the number of 
> parallel instances of my sources and sinks, I'd have to set their max 
> parallelism, and that would be different from that of the rest of the 
> operators.
> Moreover, is it possible to switch a job from non-reactive to reactive mode 
> via savepoints? What happens if my max parallelism settings change during the 
> switch? For example, to limit my sink to a single instance.
No, max parallelism can not be changed because the state in the
savepoint is distributed according to it.

> In summary, for a hypothetical pipeline that basically does something like: 
> source (parallelism between 1 & 3) -> stateful operator (parallelism between 
> 1 & 32) -> sink (parallelism exactly 1 always)
what should I do regarding max parallelism (both for execution env an
operators) in normal mode, what should I do in reactive mode, and can
I switch between modes with savepoints?
I'm assuming that the stream is keyed (for non-keyed operator max
parallelism doesn't make much sense).
I think you can have the same max parallelism in both mode and switch
safely. From your example, each operator should have different max
parallelism (source:3 -> statefeul-operator:32 -> sink:1), so it
should be configured on operator level. You'll probably want to
explore higher max parallelism to get more efficient state
distribution and rescaling.
In normal mode, you can additionally set parallelism manually.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#setMaxParallelism-int-
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/api/dag/Transformation.html#setMaxParallelism-int-

Regards,
Roman

On Thu, Mar 3, 2022 at 11:34 PM Alexis Sarda-Espinosa
<alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi everyone,
>
> I have some questions regarding max parallelism and how interacts with 
> deployment modes. The documentation states that max parallelism should be 
> "set on a per-job and per-operator granularity" but doesn't provide more 
> details. Is it possible to have different values of max parallelism in 
> different operators? I did a test in which my source had a max parallelism of 
> 3, whereas a downstream operator had a (non-max) parallelism explicitly set 
> to 4, and the job could not be started. Could this related to operator 
> chaining? Or maybe the whole job ended up with a max parallelism of 3 because 
> I didn't set it and it took the value from the source?
>
> Additionally, the documentation states that, in reactive mode, only max 
> parallelism is taken into account, so if I want to limit the number of 
> parallel instances of my sources and sinks, I'd have to set their max 
> parallelism, and that would be different from that of the rest of the 
> operators.
>
> Moreover, is it possible to switch a job from non-reactive to reactive mode 
> via savepoints? What happens if my max parallelism settings change during the 
> switch? For example, to limit my sink to a single instance.
>
> In summary, for a hypothetical pipeline that basically does something like: 
> source (parallelism between 1 & 3) -> stateful operator (parallelism between 
> 1 & 32) -> sink (parallelism exactly 1 always)
> what should I do regarding max parallelism (both for execution env an 
> operators) in normal mode, what should I do in reactive mode, and can I 
> switch between modes with savepoints?
>
> Regards,
> Alexis.
>

Reply via email to