Hello Arvid , thank you for your reply. Actually using a window to aggregate the events for a time period is not applicable to my case since I need the records to be processed immediately. Even if I could I still can not understand how I could forward the aggregated events to lets say 2 parallel operators. The slot assignment of the KeyGroup is done by flink. You mean key by again by a different property so that the previous aggregate events get reassigned again to operators. I apologize if my question is naive but I got a little confused.
Στις Δευ 4 Απρ 2022 στις 10:38 π.μ., ο/η Arvid Heise <ar...@apache.org> έγραψε: > You should create a histogram over the keys of the records. If you see a > skew, one way to go about it is to refine the key or split aggregations. > > For example, consider you want to count events per users and 2 users are > actually bots spamming lots of events accounting for 50% of all events. > Then, you will always collect all events of each bot on one machine which > limits scalability. You can, however, first aggregate all events per user > per day (or any other way to subdivide). Then, the same bot can be > processed in parallel and you then do an overall aggregation. > > If that's not possible, then your problem itself limits the scalability > and you can only try to not get both bot users on the same machine (which > can happen in 2). Then you can simply try to shift the key by adding > constants to it and check if the distribution looks better. Have a look at > KeyGroupRangeAssignment [1] to test that out without running Flink itself. > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java > > On Mon, Apr 4, 2022 at 9:25 AM Isidoros Ioannou <akis3...@gmail.com> > wrote: > >> Hello Qingsheng, >> >> thank you a lot for your answer. >> >> I will try to modify the key as you mentioned in your first assumption. >> In case the second assumption is valid also, what would you propose to >> remedy the situation? Try to experiment with different values of max >> parallelism? >> >> >> Στις Σάβ 2 Απρ 2022 στις 6:55 π.μ., ο/η Qingsheng Ren <renqs...@gmail.com> >> έγραψε: >> >>> Hi Isidoros, >>> >>> Two assumptions in my mind: >>> >>> 1. Records are not evenly distributed across different keys, e.g. some >>> accountId just has more events than others. If the record distribution is >>> predicable, you can try to combine other fields or include more information >>> into the key field to help balancing the distribution. >>> >>> 2. Keys themselves are not distributed evenly. In short the subtask ID >>> that a key belongs to is calculated by murmurHash(key.hashCode()) % >>> maxParallelism, so if the distribution of keys is quite strange, it’s >>> possible that most keys drop into the same subtask with the algorithm >>> above. AFAIK there isn't such kind of metric for monitoring number of keys >>> in a subtask, but I think you can simply investigate it with a map function >>> after keyBy. >>> >>> Hope this would be helpful! >>> >>> Qingsheng >>> >>> > On Apr 1, 2022, at 17:37, Isidoros Ioannou <akis3...@gmail.com> wrote: >>> > >>> > Hello, >>> > >>> > we ran a flink application version 1.13.2 that consists of a kafka >>> source with one partition so far >>> > then we filter the data based on some conditions, mapped to POJOS and >>> we transform to a KeyedStream based on an accountId long property from the >>> POJO. The downstream operators are 10 CEP operators that run with >>> parallelism of 14 and the maxParallelism is set to the (operatorParallelism >>> * operatorParallelism). >>> > As you see in the image attached the events are distributed unevenly >>> so some subtasks are busy and others are idle. >>> > Is there any way to distribute evenly the load to the subtasks? Thank >>> you in advance. >>> > <Capture.PNG> >>> > >>> >>>