Hello Arvid ,
thank you for your reply.

Actually using a window to aggregate the events for a time period is not
applicable to my case since I need the records to be processed immediately.
Even if I could I still can not understand how I could forward
the aggregated events to lets say 2 parallel operators. The slot assignment
of the KeyGroup is done by flink. You mean key by again by a different
property so that the previous aggregate events get reassigned again to
operators. I apologize if my question is naive but I got a little confused.




Στις Δευ 4 Απρ 2022 στις 10:38 π.μ., ο/η Arvid Heise <ar...@apache.org>
έγραψε:

> You should create a histogram over the keys of the records. If you see a
> skew, one way to go about it is to refine the key or split aggregations.
>
> For example, consider you want to count events per users and 2 users are
> actually bots spamming lots of events accounting for 50% of all events.
> Then, you will always collect all events of each bot on one machine which
> limits scalability. You can, however, first aggregate all events per user
> per day (or any other way to subdivide). Then, the same bot can be
> processed in parallel and you then do an overall aggregation.
>
> If that's not possible, then your problem itself limits the scalability
> and you can only try to not get both bot users on the same machine (which
> can happen in 2). Then you can simply try to shift the key by adding
> constants to it and check if the distribution looks better. Have a look at
> KeyGroupRangeAssignment [1] to test that out without running Flink itself.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>
> On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou <akis3...@gmail.com>
> wrote:
>
>> Hello Qingsheng,
>>
>> thank you a lot for your answer.
>>
>> I will try to modify the key as you mentioned in your first assumption.
>> In case the second assumption is valid also, what would you propose to
>> remedy the situation? Try to experiment with different values of max
>> parallelism?
>>
>>
>> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren <renqs...@gmail.com>
>> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Two assumptions in my mind:
>>>
>>> 1. Records are not evenly distributed across different keys, e.g. some
>>> accountId just has more events than others. If the record distribution is
>>> predicable, you can try to combine other fields or include more information
>>> into the key field to help balancing the distribution.
>>>
>>> 2. Keys themselves are not distributed evenly. In short the subtask ID
>>> that a key belongs to is calculated by murmurHash(key.hashCode()) %
>>> maxParallelism, so if the distribution of keys is quite strange, it’s
>>> possible that most keys drop into the same subtask with the algorithm
>>> above. AFAIK there isn't such kind of metric for monitoring number of keys
>>> in a subtask, but I think you can simply investigate it with a map function
>>> after keyBy.
>>>
>>> Hope this would be helpful!
>>>
>>> Qingsheng
>>>
>>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou <akis3...@gmail.com> wrote:
>>> >
>>> > Hello,
>>> >
>>> > we ran a flink application version 1.13.2 that consists of a kafka
>>> source with one partition so far
>>> > then we filter the data based on some conditions, mapped to POJOS and
>>> we transform to a KeyedStream based on an accountId long property from the
>>> POJO. The downstream operators are 10 CEP operators that run with
>>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism
>>> * operatorParallelism).
>>> > As you see in the image attached the events are distributed unevenly
>>> so some subtasks are busy and others are idle.
>>> > Is there any way to distribute evenly the load to the subtasks? Thank
>>> you in advance.
>>> > <Capture.PNG>
>>> >
>>>
>>>

Reply via email to