Yeah, that makes sense for the target system (Cassandra for example), but I don't see that you would need that for Kafka. Good info on hashing, though, that I am going to take a look at when I get time.
-Dave Dave Tauzell | Senior Software Engineer | Surescripts O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com Connect with us: Twitter I LinkedIn I Facebook I YouTube -----Original Message----- From: Wesley Chow [mailto:w...@chartbeat.com] Sent: Tuesday, May 03, 2016 10:51 AM To: users@kafka.apache.org Subject: Re: Hash partition of key with skew I’m not the OP, but in our case, we sometimes want data locality. For example, suppose that we have 100 consumers that are building up a cache of customer -> data mapping. If customer data is spread randomly across all partitions then a query for that customer’s data would have to hit all 100 consumers. If customer data exhibits some locality, then queries for that data only hit a subset of consumers. Wes > On May 3, 2016, at 11:18 AM, Tauzell, Dave <dave.tauz...@surescripts.com> > wrote: > > Do you need the messages to be ordered in some way? Why pass a key if you > don't want all the messages to go to one partition? > > -Dave > > Dave Tauzell | Senior Software Engineer | Surescripts > O: 651.855.3042 | www.surescripts.com <http://www.surescripts.com/> | > dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com> > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > -----Original Message----- > From: Wesley Chow [mailto:w...@chartbeat.com > <mailto:w...@chartbeat.com>] > Sent: Tuesday, May 03, 2016 9:51 AM > To: users@kafka.apache.org <mailto:users@kafka.apache.org> > Subject: Re: Hash partition of key with skew > > I’ve come up with a couple solutions since we too have a power law > distribution. However, we have not put anything into practice. > > Fixed Slicing > > One simple thing to do is to take each key and slice it into some fixed > number of partitions. So your function might be: > > (hash(key) % num) + (hash(key) % 10) > > In order to distribute it across 10 partitions. Or: > > hash(key + ‘0’) % num > hash(key + ‘1’) % num > … > hash(key + ‘9’) % num > > > Hyperspace Hashing > > If your data is multi-dimensional, then you might find hyperspace hashing > useful. I’ll give a simple example, but it’s easy to generalize. Suppose that > you have two dimensions you’d like to partition on: customer id (C) and city > location (L). You’d like to be able to subscribe to all data for some subset > of customers, and you’d also like to be able to subscribe to all data for > some subset of locations. Suppose that this data goes into a topic with 256 > partitions. > > For any piece of data, you’d construct the partition it goes to like so: > > ((hash(C) % 16) << 4) + ((hash(L) % 16) > > What that is basically saying is take C and map them to 16 different spots, > and set it as the high 4 bits of an 8 bit int. Then take the location, map it > to 16 different spots, and set it as the lower 4 bits of the int. The > resulting number is the partition that piece of data goes to. > > Now if you want one particular C, you subscribe to the 16 partitions that > contain that C. If you want some particular L, you subscribe to the 16 > partitions that contain that L. > > You can extend this scheme to an arbitrary number of dimensions subject to > the number of partitions in the topic, and you can vary the number of bits > that any particular dimension takes. This scheme suffers from a combinatorial > explosion of partitions if you really want to query on lots of different > dimensions, but you can see the Hyperdex paper for clues on how to deal with > this. > > > Unbalanced Hashing > > It’s easy to generate ok but not great hash functions. One is DJB hash, which > relies on two empirically determined constants: > > http://stackoverflow.com/questions/10696223/reason-for-5381-number-in- > djb-hash-function > <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in > -djb-hash-function><http://stackoverflow.com/questions/10696223/reason > -for-5381-number-in-djb-hash-function > <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in > -djb-hash-function>> > > (5381 and 33 in the above example) > > If you can do offline analysis, and your distribution doesn’t change over > time, then you can basically exhaustively search for two values that produce > a hash function that better distributes the load. > > > Greedy Knapsack > > But if you’re ok doing offline analysis and generating your own hash > function, then you can create one that’s simply a hard coded list of mappings > for the heaviest keys, and then defaults to a regular hash for the rest. The > easiest way to programmatically do this is to use a greedy algorithm: > > for each heavy key, k: > assign k to the partition with the least assigned weight > > > The advantage to fixed slicing and hyperspace hashing is that you don’t have > to know your distribution a priori, and it generally scales well as you > increase the number of keys. The disadvantage is that one key’s data is split > across multiple partitions. > > The advantage to unbalanced hashing and greedy knapsack is that you can get > close to an optimal partitioning scheme and all of one key resides in one > partition. The downside is that you need to do partition mapping management > as your distribution changes over time. > > Hopefully that gives you some ideas! > > Wes > > > >> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se> wrote: >> >> Hi, >> >> Not sure if this helps, but the way Loggly seem to do it is to have a >> separate topic for "noisy neighbors". See [1]. >> >> [1] >> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable >> - >> messaging-better-log-management/ >> >> Cheers, >> Jens >> >> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> wrote: >> >>> Hello, >>> >>> Is there a recommendation for handling producer side partitioning >>> based on a key with skew? >>> We want to partition on something like clientId. Problem is, this >>> key has an uniform distribution. >>> Its equally likely to see a key with 3k occurrence/day vs 100k/day >>> vs 65million/day. >>> Cardinality of key is around 1500 and there are approx 1 billion >>> records per day. >>> Partitioning by hashcode(key)%numOfPartition will create a few "hot >>> partitions" and cause a few brokers(and consumer threads) to be overloaded. >>> May be these partitions with heavy load are evenly distributed among >>> brokers, may be they are not. >>> >>> I read KIP-22 >>> < >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+ >>> P >>> artitioner+interface+in+the+new+producer >>>> >>> that >>> explains how one could write a custom partitioner. >>> I'd like to know how it was used to solve such data skew. >>> We can compute some statistics on key distribution offline and use >>> it in the partitioner. >>> Is that a good idea? Or is it way too much logic for a partitioner? >>> Anything else to consider? >>> Any thoughts or reference will be helpful. >>> >>> Thanks, >>> Srikanth >>> >> -- >> >> Jens Rantil >> Backend Developer @ Tink >> >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent matters >> you can reach me at +46-708-84 18 32. > > This e-mail and any files transmitted with it are confidential, may contain > sensitive information, and are intended solely for the use of the individual > or entity to whom they are addressed. If you have received this e-mail in > error, please notify the sender by reply e-mail immediately and destroy all > copies of the e-mail and any attachments.