Hi, Vishal

The reactive mode will adjust the parallelism of tasks by slots of cluster.
it will not allocate new workers automatically.[1]
1. max parallelism only works to scale up the parallelism of tasks.  it
will not affect the scheduling of tasks.
2. flink will enable slot sharing by default, use two slot sharing groups
to split tasks will cause more slots in summary.
3. Are you sure there are 20 task managers running? Could you give a pic of
your job UI?
4. That depends on whether you have some hot-key in data, if the hot-key
data is dealt by the same operator, it will cause the overload.

Maybe just giving this job more task managers and slots will solve the
heavy operators.


[1]https://flink.apache.org/2021/05/06/reactive-mode.html

Best,
Weihua


On Wed, Jun 29, 2022 at 8:56 PM Vishal Surana <[email protected]> wrote:

> I have a job which has about 10 operators, 3 of which are heavy weight. I
> understand that the current implementation of autoscaling gives more or
> less no configurability besides max parallelism. That is practically
> useless as the operators I have will inevitably choke if one of the 3 ends
> up with insufficient slots. I have explored the following:
>
>    1. Set very high max parallelism for the most heavy weight operator
>    with the hope that flink can use this signal to allocate subtasks. But this
>    doesn't work
>    2. I used slot sharing to group 2 of the 3 operators and created a
>    slot sharing group for just the other one with the hope that it will
>    free up more slots. Both of these are stateful operators with RocksDB being
>    the state backend. However despite setting the same slot sharing group
>    name, they're scheduled independently and each of the three (successive)
>    operators end up with the exact same parallelism no matter how many task
>    managers are running. I say slot sharing doesn't work because if it did,
>    there would have been more available slots. It is curious that flink ends
>    up allocating an identical number of slots to each.
>    3. When slot sharing is enabled, my other jobs are able to work with
>    very few slots. In this job, I see the opposite. For instance, if I spin up
>    20 task managers each with 16 slots, then there are 320 available slots.
>    However once the job starts, the job itself says ~275 slots are used and
>    the number of available slots in the GUI is 0. I have verified that 275 is
>    the correct number by examining the number of subtasks of each operator.
>    How can that be? Where are the remaining slots?
>    4. While the data is partitioned by a hash function that ought to more
>    or less distribute data randomly across operators, I can see that some
>    operators are overloaded while others aren't. Does flink try to avoid
>    uniformly distributing load for any reason, possibly to reduce network? Is
>    there a way to disable such a feature?
>
> I'm running flink version 1.13.5 but I didn't see any related change in
> recent versions of flink.
>
> Thanks a lot!
>
> --
> Vishal
>

Reply via email to