I’m not the OP, but in our case, we sometimes want data locality. For example, 
suppose that we have 100 consumers that are building up a cache of customer -> 
data mapping. If customer data is spread randomly across all partitions then a 
query for that customer’s data would have to hit all 100 consumers. If customer 
data exhibits some locality, then queries for that data only hit a subset of 
consumers.

Wes


> On May 3, 2016, at 11:18 AM, Tauzell, Dave <dave.tauz...@surescripts.com> 
> wrote:
> 
> Do you need the messages to be ordered in some way?   Why pass a key if you 
> don't want all the messages to go to one partition?
> 
> -Dave
> 
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com <http://www.surescripts.com/> |   
> dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com>
> Connect with us: Twitter I LinkedIn I Facebook I YouTube
> 
> 
> -----Original Message-----
> From: Wesley Chow [mailto:w...@chartbeat.com <mailto:w...@chartbeat.com>]
> Sent: Tuesday, May 03, 2016 9:51 AM
> To: users@kafka.apache.org <mailto:users@kafka.apache.org>
> Subject: Re: Hash partition of key with skew
> 
> I’ve come up with a couple solutions since we too have a power law 
> distribution. However, we have not put anything into practice.
> 
> Fixed Slicing
> 
> One simple thing to do is to take each key and slice it into some fixed 
> number of partitions. So your function might be:
> 
> (hash(key) % num) + (hash(key) % 10)
> 
> In order to distribute it across 10 partitions. Or:
> 
> hash(key + ‘0’) % num
> hash(key + ‘1’) % num
> …
> hash(key + ‘9’) % num
> 
> 
> Hyperspace Hashing
> 
> If your data is multi-dimensional, then you might find hyperspace hashing 
> useful. I’ll give a simple example, but it’s easy to generalize. Suppose that 
> you have two dimensions you’d like to partition on: customer id (C) and city 
> location (L). You’d like to be able to subscribe to all data for some subset 
> of customers, and you’d also like to be able to subscribe to all data for 
> some subset of locations. Suppose that this data goes into a topic with 256 
> partitions.
> 
> For any piece of data, you’d construct the partition it goes to like so:
> 
> ((hash(C) % 16) << 4) + ((hash(L) % 16)
> 
> What that is basically saying is take C and map them to 16 different spots, 
> and set it as the high 4 bits of an 8 bit int. Then take the location, map it 
> to 16 different spots, and set it as the lower 4 bits of the int. The 
> resulting number is the partition that piece of data goes to.
> 
> Now if you want one particular C, you subscribe to the 16 partitions that 
> contain that C. If you want some particular L, you subscribe to the 16 
> partitions that contain that L.
> 
> You can extend this scheme to an arbitrary number of dimensions subject to 
> the number of partitions in the topic, and you can vary the number of bits 
> that any particular dimension takes. This scheme suffers from a combinatorial 
> explosion of partitions if you really want to query on lots of different 
> dimensions, but you can see the Hyperdex paper for clues on how to deal with 
> this.
> 
> 
> Unbalanced Hashing
> 
> It’s easy to generate ok but not great hash functions. One is DJB hash, which 
> relies on two empirically determined constants:
> 
> http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
>  
> <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function><http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
>  
> <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function>>
> 
> (5381 and 33 in the above example)
> 
> If you can do offline analysis, and your distribution doesn’t change over 
> time, then you can basically exhaustively search for two values that produce 
> a hash function that better distributes the load.
> 
> 
> Greedy Knapsack
> 
> But if you’re ok doing offline analysis and generating your own hash 
> function, then you can create one that’s simply a hard coded list of mappings 
> for the heaviest keys, and then defaults to a regular hash for the rest. The 
> easiest way to programmatically do this is to use a greedy algorithm:
> 
>  for each heavy key, k:
>    assign k to the partition with the least assigned weight
> 
> 
> The advantage to fixed slicing and hyperspace hashing is that you don’t have 
> to know your distribution a priori, and it generally scales well as you 
> increase the number of keys. The disadvantage is that one key’s data is split 
> across multiple partitions.
> 
> The advantage to unbalanced hashing and greedy knapsack is that you can get 
> close to an optimal partitioning scheme and all of one key resides in one 
> partition. The downside is that you need to do partition mapping management 
> as your distribution changes over time.
> 
> Hopefully that gives you some ideas!
> 
> Wes
> 
> 
> 
>> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se> wrote:
>> 
>> Hi,
>> 
>> Not sure if this helps, but the way Loggly seem to do it is to have a
>> separate topic for "noisy neighbors". See [1].
>> 
>> [1]
>> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable-
>> messaging-better-log-management/
>> 
>> Cheers,
>> Jens
>> 
>> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> wrote:
>> 
>>> Hello,
>>> 
>>> Is there a recommendation for handling producer side partitioning
>>> based on a key with skew?
>>> We want to partition on something like clientId. Problem is, this key
>>> has an uniform distribution.
>>> Its equally likely to see a key with 3k occurrence/day vs 100k/day vs
>>> 65million/day.
>>> Cardinality of key is around 1500 and there are approx 1 billion
>>> records per day.
>>> Partitioning by hashcode(key)%numOfPartition will create a few "hot
>>> partitions" and cause a few brokers(and consumer threads) to be overloaded.
>>> May be these partitions with heavy load are evenly distributed among
>>> brokers, may be they are not.
>>> 
>>> I read KIP-22
>>> <
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+P
>>> artitioner+interface+in+the+new+producer
>>>> 
>>> that
>>> explains how one could write a custom partitioner.
>>> I'd like to know how it was used to solve such data skew.
>>> We can compute some statistics on key distribution offline and use it
>>> in the partitioner.
>>> Is that a good idea? Or is it way too much logic for a partitioner?
>>> Anything else to consider?
>>> Any thoughts or reference will be helpful.
>>> 
>>> Thanks,
>>> Srikanth
>>> 
>> --
>> 
>> Jens Rantil
>> Backend Developer @ Tink
>> 
>> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent matters
>> you can reach me at +46-708-84 18 32.
> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.

Reply via email to