Having 1 partition and consumer thread per unique key value will result in the hot partition problem. Some keys get disproportionately high records. It should be fine if a consumer has to deal with a few keys, it doesn't have to be 1:1 mapping. May be I should try to solve this some other way. Other alternates I thought off involves another step in processing.
Srikanth On Tue, May 3, 2016 at 4:02 PM, Tauzell, Dave <dave.tauz...@surescripts.com> wrote: > Ok, I see what you are doing. Unless you have 1500 partitions and 1500 > consumers you will have consumers get records for different keys and will > have to deal with the problem. If you can have 1500 consumers and > partitions it will simplify your processing. > > -Dave > > Dave Tauzell | Senior Software Engineer | Surescripts > O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > -----Original Message----- > From: Srikanth [mailto:srikanth...@gmail.com] > Sent: Tuesday, May 03, 2016 1:57 PM > To: users@kafka.apache.org > Subject: Re: Hash partition of key with skew > > So, there are a few consumers. One is a spark streaming job where we can > go a partitionBy(key) and take a slight hit. > There are two consumers which are just java apps. Multiple instance > running in Marathon. > One consumer reads records, does basic checks, buffers records on local > disk and uploads to S3 periodically. > Upload to S3 is partitioned by the "key" field. I.e, one folder per key. > It does offset management to make sure offset commit is in sync with S3 > upload. > Having a single consumer thread receive records from all keys is not ideal > here. There are multiple challenges. Initiate several uploads(one file per > key), upload file size vary, etc. > These can be solved in a few ways. One of them is to have kafka producer > partition by key. If I decide to do that then I have to solve the question > I posted first!!! > > Srikanth > > On Tue, May 3, 2016 at 1:22 PM, Tauzell, Dave < > dave.tauz...@surescripts.com> > wrote: > > > Srikanth, > > > > I think the most efficient use of the partitions would be to spread > > all messages evenly across all partitions by *not* using a key. Then, > > all of your consumers in the same consumer group would receive about > equal numbers > > of messages. What will you do with the messages as you pull them off of > > Kafka? > > > > -Dave > > > > > > -----Original Message----- > > From: Srikanth [mailto:srikanth...@gmail.com] > > Sent: Tuesday, May 03, 2016 12:12 PM > > To: users@kafka.apache.org > > Subject: Re: Hash partition of key with skew > > > > Jens, > > Thanks for the link. That is something to consider. Of course it has > > downsides too. > > > > Wesley, > > That is some good info on hashing. We've explored a couple of these > > options. > > I see that you are hesitant to put these in production. Even we want > > to evaluate or options first. > > > > Dave, > > Our need to do this is similar to Wesley. And consumers of topic can > > be efficient if they get records from one(or a very few) keys. > > Why do you think it is not applicable to Kafka? Are you suggesting > > that there are other ways to handle it when using Kafka? > > > > Srikanth > > > > On Tue, May 3, 2016 at 11:58 AM, Tauzell, Dave < > > dave.tauz...@surescripts.com > > > wrote: > > > > > Yeah, that makes sense for the target system (Cassandra for > > > example), but I don't see that you would need that for Kafka. Good > > > info on hashing, though, that I am going to take a look at when I get > time. > > > > > > -Dave > > > > > > Dave Tauzell | Senior Software Engineer | Surescripts > > > O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com > > > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > > > > > > > -----Original Message----- > > > From: Wesley Chow [mailto:w...@chartbeat.com] > > > Sent: Tuesday, May 03, 2016 10:51 AM > > > To: users@kafka.apache.org > > > Subject: Re: Hash partition of key with skew > > > > > > I’m not the OP, but in our case, we sometimes want data locality. > > > For example, suppose that we have 100 consumers that are building up > > > a cache of customer -> data mapping. If customer data is spread > > > randomly across all partitions then a query for that customer’s data > > > would have to hit all 100 consumers. If customer data exhibits some > > > locality, then queries for that data only hit a subset of consumers. > > > > > > Wes > > > > > > > > > > On May 3, 2016, at 11:18 AM, Tauzell, Dave > > > > <dave.tauz...@surescripts.com> > > > wrote: > > > > > > > > Do you need the messages to be ordered in some way? Why pass a key > if > > > you don't want all the messages to go to one partition? > > > > > > > > -Dave > > > > > > > > Dave Tauzell | Senior Software Engineer | Surescripts > > > > O: 651.855.3042 | www.surescripts.com > > > > <http://www.surescripts.com/> > > > > | > > > dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com> > > > > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > > > > > > > > > > -----Original Message----- > > > > From: Wesley Chow [mailto:w...@chartbeat.com > > > > <mailto:w...@chartbeat.com>] > > > > Sent: Tuesday, May 03, 2016 9:51 AM > > > > To: users@kafka.apache.org <mailto:users@kafka.apache.org> > > > > Subject: Re: Hash partition of key with skew > > > > > > > > I’ve come up with a couple solutions since we too have a power law > > > distribution. However, we have not put anything into practice. > > > > > > > > Fixed Slicing > > > > > > > > One simple thing to do is to take each key and slice it into some > > > > fixed > > > number of partitions. So your function might be: > > > > > > > > (hash(key) % num) + (hash(key) % 10) > > > > > > > > In order to distribute it across 10 partitions. Or: > > > > > > > > hash(key + ‘0’) % num > > > > hash(key + ‘1’) % num > > > > … > > > > hash(key + ‘9’) % num > > > > > > > > > > > > Hyperspace Hashing > > > > > > > > If your data is multi-dimensional, then you might find hyperspace > > > hashing useful. I’ll give a simple example, but it’s easy to > generalize. > > > Suppose that you have two dimensions you’d like to partition on: > > > customer id (C) and city location (L). You’d like to be able to > > > subscribe to all data for some subset of customers, and you’d also > > > like to be able to subscribe to all data for some subset of locations. > > > Suppose that this data goes into a topic with 256 partitions. > > > > > > > > For any piece of data, you’d construct the partition it goes to > > > > like > > so: > > > > > > > > ((hash(C) % 16) << 4) + ((hash(L) % 16) > > > > > > > > What that is basically saying is take C and map them to 16 > > > > different > > > spots, and set it as the high 4 bits of an 8 bit int. Then take the > > > location, map it to 16 different spots, and set it as the lower 4 > > > bits of the int. The resulting number is the partition that piece of > > > data > > goes to. > > > > > > > > Now if you want one particular C, you subscribe to the 16 > > > > partitions > > > that contain that C. If you want some particular L, you subscribe to > > > the 16 partitions that contain that L. > > > > > > > > You can extend this scheme to an arbitrary number of dimensions > > > > subject > > > to the number of partitions in the topic, and you can vary the > > > number of bits that any particular dimension takes. This scheme > > > suffers from a combinatorial explosion of partitions if you really > > > want to query on lots of different dimensions, but you can see the > > > Hyperdex paper for clues on how to deal with this. > > > > > > > > > > > > Unbalanced Hashing > > > > > > > > It’s easy to generate ok but not great hash functions. One is DJB > > > > hash, > > > which relies on two empirically determined constants: > > > > > > > > http://stackoverflow.com/questions/10696223/reason-for-5381-number > > > > -i > > > > n- > > > > djb-hash-function > > > > <http://stackoverflow.com/questions/10696223/reason-for-5381-numbe > > > > r- > > > > in > > > > -djb-hash-function><http://stackoverflow.com/questions/10696223/re > > > > -djb-hash-function>as > > > > -djb-hash-function>on > > > > -for-5381-number-in-djb-hash-function > > > > <http://stackoverflow.com/questions/10696223/reason-for-5381-numbe > > > > r- > > > > in > > > > -djb-hash-function>> > > > > > > > > (5381 and 33 in the above example) > > > > > > > > If you can do offline analysis, and your distribution doesn’t > > > > change > > > over time, then you can basically exhaustively search for two values > > > that produce a hash function that better distributes the load. > > > > > > > > > > > > Greedy Knapsack > > > > > > > > But if you’re ok doing offline analysis and generating your own > > > > hash > > > function, then you can create one that’s simply a hard coded list of > > > mappings for the heaviest keys, and then defaults to a regular hash > > > for the rest. The easiest way to programmatically do this is to use > > > a greedy > > > algorithm: > > > > > > > > for each heavy key, k: > > > > assign k to the partition with the least assigned weight > > > > > > > > > > > > The advantage to fixed slicing and hyperspace hashing is that you > > > > don’t > > > have to know your distribution a priori, and it generally scales > > > well as you increase the number of keys. The disadvantage is that > > > one key’s data is split across multiple partitions. > > > > > > > > The advantage to unbalanced hashing and greedy knapsack is that > > > > you can > > > get close to an optimal partitioning scheme and all of one key > > > resides in one partition. The downside is that you need to do > > > partition mapping management as your distribution changes over time. > > > > > > > > Hopefully that gives you some ideas! > > > > > > > > Wes > > > > > > > > > > > > > > > >> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se> > wrote: > > > >> > > > >> Hi, > > > >> > > > >> Not sure if this helps, but the way Loggly seem to do it is to > > > >> have a separate topic for "noisy neighbors". See [1]. > > > >> > > > >> [1] > > > >> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreak > > > >> ab > > > >> le > > > >> - > > > >> messaging-better-log-management/ > > > >> > > > >> Cheers, > > > >> Jens > > > >> > > > >> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> > > wrote: > > > >> > > > >>> Hello, > > > >>> > > > >>> Is there a recommendation for handling producer side > > > >>> partitioning based on a key with skew? > > > >>> We want to partition on something like clientId. Problem is, > > > >>> this key has an uniform distribution. > > > >>> Its equally likely to see a key with 3k occurrence/day vs > > > >>> 100k/day vs 65million/day. > > > >>> Cardinality of key is around 1500 and there are approx 1 billion > > > >>> records per day. > > > >>> Partitioning by hashcode(key)%numOfPartition will create a few > > > >>> "hot partitions" and cause a few brokers(and consumer threads) > > > >>> to be > > > overloaded. > > > >>> May be these partitions with heavy load are evenly distributed > > > >>> among brokers, may be they are not. > > > >>> > > > >>> I read KIP-22 > > > >>> < > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expos > > > >>> e+ > > > >>> a+ > > > >>> P > > > >>> artitioner+interface+in+the+new+producer > > > >>>> > > > >>> that > > > >>> explains how one could write a custom partitioner. > > > >>> I'd like to know how it was used to solve such data skew. > > > >>> We can compute some statistics on key distribution offline and > > > >>> use it in the partitioner. > > > >>> Is that a good idea? Or is it way too much logic for a partitioner? > > > >>> Anything else to consider? > > > >>> Any thoughts or reference will be helpful. > > > >>> > > > >>> Thanks, > > > >>> Srikanth > > > >>> > > > >> -- > > > >> > > > >> Jens Rantil > > > >> Backend Developer @ Tink > > > >> > > > >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent > > > >> matters you can reach me at +46-708-84 18 32. > > > > > > > > This e-mail and any files transmitted with it are confidential, > > > > may > > > contain sensitive information, and are intended solely for the use > > > of the individual or entity to whom they are addressed. If you have > > > received this e-mail in error, please notify the sender by reply > > > e-mail immediately and destroy all copies of the e-mail and any > > attachments. > > > > > > > > This e-mail and any files transmitted with it are confidential, may > > contain sensitive information, and are intended solely for the use of > > the individual or entity to whom they are addressed. If you have > > received this e-mail in error, please notify the sender by reply > > e-mail immediately and destroy all copies of the e-mail and any > attachments. > > > This e-mail and any files transmitted with it are confidential, may > contain sensitive information, and are intended solely for the use of the > individual or entity to whom they are addressed. If you have received this > e-mail in error, please notify the sender by reply e-mail immediately and > destroy all copies of the e-mail and any attachments. >