Currently, we don't work on trying to ensure that the number of key groups is as evenly spread as possible. As a workaround I would suggest to increase the number of key groups or to change the key function.
Cheers, Till On Wed, Oct 23, 2019 at 1:42 PM Piotr Nowojski <[email protected]> wrote: > Hi, > > This is a known issue of Flink. For example key groups can have sizes +/- > 1 and they are currently randomly distributed across the cluster, so some > machines will get more keys to handle then the others. If the number of > keys is relatively small, like 3 keys per key group, the load difference > can be quite large (some machines may get almost only key groups with size > 2 while others will get mostly with size of 3, making 50% load difference). > > Unfortunately I don’t know about any concrete plans to address it. Maybe > Till will know something more (I CC’ed him). > > Also I don’t think it’s exposed via a metric anywhere. > > Piotrek > > On 22 Oct 2019, at 10:00, Flavio Pompermaier <[email protected]> wrote: > > Hi to all, > I was looking into the Flink example of the Flink training trying to > understand why in the ClickEventCount[1] one task manager was reading > twice the speed of the other. > > I had to debug a lot of internal code of Flink to understand that it > depends on the adopted hash function (used by Flink to assign keys to > taskmanagers) that was assigning 4 keys to a TM and 2 to the other. Is > there a smarter way to monitor this thing (e.g. a metric like > taskManager_numKeys)? > > I also discovered that one cannot force how to partition keys per > taskmanager (i.e. use keyBy after a customPartition). Is there any > development effort in this direction? > > Best, > Flavio > > [1] > https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java > > >
