Ok, I see what you are doing. Unless you have 1500 partitions and 1500 consumers you will have consumers get records for different keys and will have to deal with the problem. If you can have 1500 consumers and partitions it will simplify your processing.
-Dave Dave Tauzell | Senior Software Engineer | Surescripts O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com Connect with us: Twitter I LinkedIn I Facebook I YouTube -----Original Message----- From: Srikanth [mailto:srikanth...@gmail.com] Sent: Tuesday, May 03, 2016 1:57 PM To: users@kafka.apache.org Subject: Re: Hash partition of key with skew So, there are a few consumers. One is a spark streaming job where we can go a partitionBy(key) and take a slight hit. There are two consumers which are just java apps. Multiple instance running in Marathon. One consumer reads records, does basic checks, buffers records on local disk and uploads to S3 periodically. Upload to S3 is partitioned by the "key" field. I.e, one folder per key. It does offset management to make sure offset commit is in sync with S3 upload. Having a single consumer thread receive records from all keys is not ideal here. There are multiple challenges. Initiate several uploads(one file per key), upload file size vary, etc. These can be solved in a few ways. One of them is to have kafka producer partition by key. If I decide to do that then I have to solve the question I posted first!!! Srikanth On Tue, May 3, 2016 at 1:22 PM, Tauzell, Dave <dave.tauz...@surescripts.com> wrote: > Srikanth, > > I think the most efficient use of the partitions would be to spread > all messages evenly across all partitions by *not* using a key. Then, > all of your consumers in the same consumer group would receive about equal > numbers > of messages. What will you do with the messages as you pull them off of > Kafka? > > -Dave > > > -----Original Message----- > From: Srikanth [mailto:srikanth...@gmail.com] > Sent: Tuesday, May 03, 2016 12:12 PM > To: users@kafka.apache.org > Subject: Re: Hash partition of key with skew > > Jens, > Thanks for the link. That is something to consider. Of course it has > downsides too. > > Wesley, > That is some good info on hashing. We've explored a couple of these > options. > I see that you are hesitant to put these in production. Even we want > to evaluate or options first. > > Dave, > Our need to do this is similar to Wesley. And consumers of topic can > be efficient if they get records from one(or a very few) keys. > Why do you think it is not applicable to Kafka? Are you suggesting > that there are other ways to handle it when using Kafka? > > Srikanth > > On Tue, May 3, 2016 at 11:58 AM, Tauzell, Dave < > dave.tauz...@surescripts.com > > wrote: > > > Yeah, that makes sense for the target system (Cassandra for > > example), but I don't see that you would need that for Kafka. Good > > info on hashing, though, that I am going to take a look at when I get time. > > > > -Dave > > > > Dave Tauzell | Senior Software Engineer | Surescripts > > O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com > > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > > > > -----Original Message----- > > From: Wesley Chow [mailto:w...@chartbeat.com] > > Sent: Tuesday, May 03, 2016 10:51 AM > > To: users@kafka.apache.org > > Subject: Re: Hash partition of key with skew > > > > I’m not the OP, but in our case, we sometimes want data locality. > > For example, suppose that we have 100 consumers that are building up > > a cache of customer -> data mapping. If customer data is spread > > randomly across all partitions then a query for that customer’s data > > would have to hit all 100 consumers. If customer data exhibits some > > locality, then queries for that data only hit a subset of consumers. > > > > Wes > > > > > > > On May 3, 2016, at 11:18 AM, Tauzell, Dave > > > <dave.tauz...@surescripts.com> > > wrote: > > > > > > Do you need the messages to be ordered in some way? Why pass a key if > > you don't want all the messages to go to one partition? > > > > > > -Dave > > > > > > Dave Tauzell | Senior Software Engineer | Surescripts > > > O: 651.855.3042 | www.surescripts.com > > > <http://www.surescripts.com/> > > > | > > dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com> > > > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > > > > > > > -----Original Message----- > > > From: Wesley Chow [mailto:w...@chartbeat.com > > > <mailto:w...@chartbeat.com>] > > > Sent: Tuesday, May 03, 2016 9:51 AM > > > To: users@kafka.apache.org <mailto:users@kafka.apache.org> > > > Subject: Re: Hash partition of key with skew > > > > > > I’ve come up with a couple solutions since we too have a power law > > distribution. However, we have not put anything into practice. > > > > > > Fixed Slicing > > > > > > One simple thing to do is to take each key and slice it into some > > > fixed > > number of partitions. So your function might be: > > > > > > (hash(key) % num) + (hash(key) % 10) > > > > > > In order to distribute it across 10 partitions. Or: > > > > > > hash(key + ‘0’) % num > > > hash(key + ‘1’) % num > > > … > > > hash(key + ‘9’) % num > > > > > > > > > Hyperspace Hashing > > > > > > If your data is multi-dimensional, then you might find hyperspace > > hashing useful. I’ll give a simple example, but it’s easy to generalize. > > Suppose that you have two dimensions you’d like to partition on: > > customer id (C) and city location (L). You’d like to be able to > > subscribe to all data for some subset of customers, and you’d also > > like to be able to subscribe to all data for some subset of locations. > > Suppose that this data goes into a topic with 256 partitions. > > > > > > For any piece of data, you’d construct the partition it goes to > > > like > so: > > > > > > ((hash(C) % 16) << 4) + ((hash(L) % 16) > > > > > > What that is basically saying is take C and map them to 16 > > > different > > spots, and set it as the high 4 bits of an 8 bit int. Then take the > > location, map it to 16 different spots, and set it as the lower 4 > > bits of the int. The resulting number is the partition that piece of > > data > goes to. > > > > > > Now if you want one particular C, you subscribe to the 16 > > > partitions > > that contain that C. If you want some particular L, you subscribe to > > the 16 partitions that contain that L. > > > > > > You can extend this scheme to an arbitrary number of dimensions > > > subject > > to the number of partitions in the topic, and you can vary the > > number of bits that any particular dimension takes. This scheme > > suffers from a combinatorial explosion of partitions if you really > > want to query on lots of different dimensions, but you can see the > > Hyperdex paper for clues on how to deal with this. > > > > > > > > > Unbalanced Hashing > > > > > > It’s easy to generate ok but not great hash functions. One is DJB > > > hash, > > which relies on two empirically determined constants: > > > > > > http://stackoverflow.com/questions/10696223/reason-for-5381-number > > > -i > > > n- > > > djb-hash-function > > > <http://stackoverflow.com/questions/10696223/reason-for-5381-numbe > > > r- > > > in > > > -djb-hash-function><http://stackoverflow.com/questions/10696223/re > > > -djb-hash-function>as > > > -djb-hash-function>on > > > -for-5381-number-in-djb-hash-function > > > <http://stackoverflow.com/questions/10696223/reason-for-5381-numbe > > > r- > > > in > > > -djb-hash-function>> > > > > > > (5381 and 33 in the above example) > > > > > > If you can do offline analysis, and your distribution doesn’t > > > change > > over time, then you can basically exhaustively search for two values > > that produce a hash function that better distributes the load. > > > > > > > > > Greedy Knapsack > > > > > > But if you’re ok doing offline analysis and generating your own > > > hash > > function, then you can create one that’s simply a hard coded list of > > mappings for the heaviest keys, and then defaults to a regular hash > > for the rest. The easiest way to programmatically do this is to use > > a greedy > > algorithm: > > > > > > for each heavy key, k: > > > assign k to the partition with the least assigned weight > > > > > > > > > The advantage to fixed slicing and hyperspace hashing is that you > > > don’t > > have to know your distribution a priori, and it generally scales > > well as you increase the number of keys. The disadvantage is that > > one key’s data is split across multiple partitions. > > > > > > The advantage to unbalanced hashing and greedy knapsack is that > > > you can > > get close to an optimal partitioning scheme and all of one key > > resides in one partition. The downside is that you need to do > > partition mapping management as your distribution changes over time. > > > > > > Hopefully that gives you some ideas! > > > > > > Wes > > > > > > > > > > > >> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se> wrote: > > >> > > >> Hi, > > >> > > >> Not sure if this helps, but the way Loggly seem to do it is to > > >> have a separate topic for "noisy neighbors". See [1]. > > >> > > >> [1] > > >> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreak > > >> ab > > >> le > > >> - > > >> messaging-better-log-management/ > > >> > > >> Cheers, > > >> Jens > > >> > > >> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> > wrote: > > >> > > >>> Hello, > > >>> > > >>> Is there a recommendation for handling producer side > > >>> partitioning based on a key with skew? > > >>> We want to partition on something like clientId. Problem is, > > >>> this key has an uniform distribution. > > >>> Its equally likely to see a key with 3k occurrence/day vs > > >>> 100k/day vs 65million/day. > > >>> Cardinality of key is around 1500 and there are approx 1 billion > > >>> records per day. > > >>> Partitioning by hashcode(key)%numOfPartition will create a few > > >>> "hot partitions" and cause a few brokers(and consumer threads) > > >>> to be > > overloaded. > > >>> May be these partitions with heavy load are evenly distributed > > >>> among brokers, may be they are not. > > >>> > > >>> I read KIP-22 > > >>> < > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expos > > >>> e+ > > >>> a+ > > >>> P > > >>> artitioner+interface+in+the+new+producer > > >>>> > > >>> that > > >>> explains how one could write a custom partitioner. > > >>> I'd like to know how it was used to solve such data skew. > > >>> We can compute some statistics on key distribution offline and > > >>> use it in the partitioner. > > >>> Is that a good idea? Or is it way too much logic for a partitioner? > > >>> Anything else to consider? > > >>> Any thoughts or reference will be helpful. > > >>> > > >>> Thanks, > > >>> Srikanth > > >>> > > >> -- > > >> > > >> Jens Rantil > > >> Backend Developer @ Tink > > >> > > >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent > > >> matters you can reach me at +46-708-84 18 32. > > > > > > This e-mail and any files transmitted with it are confidential, > > > may > > contain sensitive information, and are intended solely for the use > > of the individual or entity to whom they are addressed. If you have > > received this e-mail in error, please notify the sender by reply > > e-mail immediately and destroy all copies of the e-mail and any > attachments. > > > > > This e-mail and any files transmitted with it are confidential, may > contain sensitive information, and are intended solely for the use of > the individual or entity to whom they are addressed. If you have > received this e-mail in error, please notify the sender by reply > e-mail immediately and destroy all copies of the e-mail and any attachments. > This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.