Hi Yuval,

Just a couple of comments:


  *   Assuming that all your 4 different keys are evenly distributed, and you 
send them to (only) 3 buckets, you would expect at least one bucket to cover 2 
of your keys, hence the 50%
  *   With low entropy keys avoiding data skew is quite difficult
  *   But your situation could be worse, all 4 keys could end up in the same 
bucket, if the hash function in use happens to generate collisions for the 4 
keys, in which case 2 of your 3 buckets would not process any events … this 
could also lead to watermarks not progressing …
  *   There is two proposal on how to improve the situation:
     *   Use the same parallelism and max parallelism for the relevant 
operators and implement a manual partitioner
        *   A manual partitioner is also good in situations where you want to 
lower the bias and you exactly know the distribution of your key space and 
rearrange keys to even-out numbers
     *   More sophisticated (if possible), divide-and-conquer like:
        *   Key by your ‘small’ key plus soma arbitrary attribute with higher 
entropy
        *   Window aggregate first on that artificial key
        *   Aggregate the results on your original ‘small’ key
        *   This could be interesting for high-throughput situation where you 
actually want to run in parallelism higher than the number of different ‘small’ 
keys

Hope this helps

Thias


From: Yuval Itzchakov <yuva...@gmail.com>
Sent: Mittwoch, 3. November 2021 14:41
To: user <user@flink.apache.org>
Subject: Custom partitioning of keys with keyBy

Hi,
I have a use-case where I'd like to partition a KeyedDataStream a bit 
differently than how Flinks default partitioning works with key groups.

[cid:image001.png@01D7D0C8.69E83060]
What I'd like to be able to do is take all my data and split it up evenly 
between 3 buckets which will store the data in the state. Using the key above 
works, but splits the data unevenly between the different key groups, as 
usually the key space is very small (0 - 3). What ends up happening is that 
sometimes 50% of the keys end up on the same operator index, where ideally I'd 
like to distribute it evenly between all operator indexes in the cluster.

Is there any way of doing this?
--
Best Regards,
Yuval Itzchakov.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to