I’m not the OP, but in our case, we sometimes want data locality. For example, suppose that we have 100 consumers that are building up a cache of customer -> data mapping. If customer data is spread randomly across all partitions then a query for that customer’s data would have to hit all 100 consumers. If customer data exhibits some locality, then queries for that data only hit a subset of consumers.
Wes > On May 3, 2016, at 11:18 AM, Tauzell, Dave <dave.tauz...@surescripts.com> > wrote: > > Do you need the messages to be ordered in some way? Why pass a key if you > don't want all the messages to go to one partition? > > -Dave > > Dave Tauzell | Senior Software Engineer | Surescripts > O: 651.855.3042 | www.surescripts.com <http://www.surescripts.com/> | > dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com> > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > -----Original Message----- > From: Wesley Chow [mailto:w...@chartbeat.com <mailto:w...@chartbeat.com>] > Sent: Tuesday, May 03, 2016 9:51 AM > To: users@kafka.apache.org <mailto:users@kafka.apache.org> > Subject: Re: Hash partition of key with skew > > I’ve come up with a couple solutions since we too have a power law > distribution. However, we have not put anything into practice. > > Fixed Slicing > > One simple thing to do is to take each key and slice it into some fixed > number of partitions. So your function might be: > > (hash(key) % num) + (hash(key) % 10) > > In order to distribute it across 10 partitions. Or: > > hash(key + ‘0’) % num > hash(key + ‘1’) % num > … > hash(key + ‘9’) % num > > > Hyperspace Hashing > > If your data is multi-dimensional, then you might find hyperspace hashing > useful. I’ll give a simple example, but it’s easy to generalize. Suppose that > you have two dimensions you’d like to partition on: customer id (C) and city > location (L). You’d like to be able to subscribe to all data for some subset > of customers, and you’d also like to be able to subscribe to all data for > some subset of locations. Suppose that this data goes into a topic with 256 > partitions. > > For any piece of data, you’d construct the partition it goes to like so: > > ((hash(C) % 16) << 4) + ((hash(L) % 16) > > What that is basically saying is take C and map them to 16 different spots, > and set it as the high 4 bits of an 8 bit int. Then take the location, map it > to 16 different spots, and set it as the lower 4 bits of the int. The > resulting number is the partition that piece of data goes to. > > Now if you want one particular C, you subscribe to the 16 partitions that > contain that C. If you want some particular L, you subscribe to the 16 > partitions that contain that L. > > You can extend this scheme to an arbitrary number of dimensions subject to > the number of partitions in the topic, and you can vary the number of bits > that any particular dimension takes. This scheme suffers from a combinatorial > explosion of partitions if you really want to query on lots of different > dimensions, but you can see the Hyperdex paper for clues on how to deal with > this. > > > Unbalanced Hashing > > It’s easy to generate ok but not great hash functions. One is DJB hash, which > relies on two empirically determined constants: > > http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function > > <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function><http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function > > <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function>> > > (5381 and 33 in the above example) > > If you can do offline analysis, and your distribution doesn’t change over > time, then you can basically exhaustively search for two values that produce > a hash function that better distributes the load. > > > Greedy Knapsack > > But if you’re ok doing offline analysis and generating your own hash > function, then you can create one that’s simply a hard coded list of mappings > for the heaviest keys, and then defaults to a regular hash for the rest. The > easiest way to programmatically do this is to use a greedy algorithm: > > for each heavy key, k: > assign k to the partition with the least assigned weight > > > The advantage to fixed slicing and hyperspace hashing is that you don’t have > to know your distribution a priori, and it generally scales well as you > increase the number of keys. The disadvantage is that one key’s data is split > across multiple partitions. > > The advantage to unbalanced hashing and greedy knapsack is that you can get > close to an optimal partitioning scheme and all of one key resides in one > partition. The downside is that you need to do partition mapping management > as your distribution changes over time. > > Hopefully that gives you some ideas! > > Wes > > > >> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se> wrote: >> >> Hi, >> >> Not sure if this helps, but the way Loggly seem to do it is to have a >> separate topic for "noisy neighbors". See [1]. >> >> [1] >> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable- >> messaging-better-log-management/ >> >> Cheers, >> Jens >> >> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> wrote: >> >>> Hello, >>> >>> Is there a recommendation for handling producer side partitioning >>> based on a key with skew? >>> We want to partition on something like clientId. Problem is, this key >>> has an uniform distribution. >>> Its equally likely to see a key with 3k occurrence/day vs 100k/day vs >>> 65million/day. >>> Cardinality of key is around 1500 and there are approx 1 billion >>> records per day. >>> Partitioning by hashcode(key)%numOfPartition will create a few "hot >>> partitions" and cause a few brokers(and consumer threads) to be overloaded. >>> May be these partitions with heavy load are evenly distributed among >>> brokers, may be they are not. >>> >>> I read KIP-22 >>> < >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+P >>> artitioner+interface+in+the+new+producer >>>> >>> that >>> explains how one could write a custom partitioner. >>> I'd like to know how it was used to solve such data skew. >>> We can compute some statistics on key distribution offline and use it >>> in the partitioner. >>> Is that a good idea? Or is it way too much logic for a partitioner? >>> Anything else to consider? >>> Any thoughts or reference will be helpful. >>> >>> Thanks, >>> Srikanth >>> >> -- >> >> Jens Rantil >> Backend Developer @ Tink >> >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent matters >> you can reach me at +46-708-84 18 32. > > This e-mail and any files transmitted with it are confidential, may contain > sensitive information, and are intended solely for the use of the individual > or entity to whom they are addressed. If you have received this e-mail in > error, please notify the sender by reply e-mail immediately and destroy all > copies of the e-mail and any attachments.