Having 1 partition and consumer thread per unique key value will result in
the hot partition problem. Some keys get disproportionately high records.
It should be fine if a consumer has to deal with a few keys, it doesn't
have to be 1:1 mapping.
May be I should try to solve this some other way. Other alternates I
thought off involves another step in processing.

Srikanth

On Tue, May 3, 2016 at 4:02 PM, Tauzell, Dave <dave.tauz...@surescripts.com>
wrote:

> Ok, I see what you are doing.  Unless you have 1500 partitions and 1500
> consumers you will have consumers get records for different keys and will
> have to deal with the problem.   If you can have 1500 consumers and
> partitions it will simplify your processing.
>
> -Dave
>
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com |   dave.tauz...@surescripts.com
> Connect with us: Twitter I LinkedIn I Facebook I YouTube
>
>
> -----Original Message-----
> From: Srikanth [mailto:srikanth...@gmail.com]
> Sent: Tuesday, May 03, 2016 1:57 PM
> To: users@kafka.apache.org
> Subject: Re: Hash partition of key with skew
>
> So, there are a few consumers. One is a spark streaming job where we can
> go a partitionBy(key) and take a slight hit.
> There are two consumers which are just java apps. Multiple instance
> running in Marathon.
> One consumer reads records, does basic checks, buffers records on local
> disk and uploads to S3 periodically.
> Upload to S3 is partitioned by the "key" field. I.e, one folder per key.
> It does offset management to make sure offset commit is in sync with S3
> upload.
> Having a single consumer thread receive records from all keys is not ideal
> here. There are multiple challenges. Initiate several uploads(one file per
> key), upload file size vary, etc.
> These can be solved in a few ways. One of them is to have kafka producer
> partition by key. If I decide to do that then I have to solve the question
> I posted first!!!
>
> Srikanth
>
> On Tue, May 3, 2016 at 1:22 PM, Tauzell, Dave <
> dave.tauz...@surescripts.com>
> wrote:
>
> > Srikanth,
> >
> > I think the most efficient use of the partitions would be to spread
> > all messages evenly across all partitions by *not* using a key. Then,
> > all of your consumers in the same consumer group would receive about
> equal numbers
> > of messages.   What will you do with the messages as you pull them off of
> > Kafka?
> >
> > -Dave
> >
> >
> > -----Original Message-----
> > From: Srikanth [mailto:srikanth...@gmail.com]
> > Sent: Tuesday, May 03, 2016 12:12 PM
> > To: users@kafka.apache.org
> > Subject: Re: Hash partition of key with skew
> >
> > Jens,
> > Thanks for the link. That is something to consider. Of course it has
> > downsides too.
> >
> > Wesley,
> > That is some good info on hashing. We've explored a couple of these
> > options.
> > I see that you are hesitant to put these in production. Even we want
> > to evaluate or options first.
> >
> > Dave,
> > Our need to do this is similar to Wesley. And consumers of topic can
> > be efficient if they get records from one(or a very few) keys.
> > Why do you think it is not applicable to Kafka? Are you suggesting
> > that there are other ways to handle it when using Kafka?
> >
> > Srikanth
> >
> > On Tue, May 3, 2016 at 11:58 AM, Tauzell, Dave <
> > dave.tauz...@surescripts.com
> > > wrote:
> >
> > > Yeah, that makes sense for the target system (Cassandra for
> > > example), but I don't see that you would need that for Kafka.  Good
> > > info on hashing, though, that I am going to take a look at when I get
> time.
> > >
> > > -Dave
> > >
> > > Dave Tauzell | Senior Software Engineer | Surescripts
> > > O: 651.855.3042 | www.surescripts.com |   dave.tauz...@surescripts.com
> > > Connect with us: Twitter I LinkedIn I Facebook I YouTube
> > >
> > >
> > > -----Original Message-----
> > > From: Wesley Chow [mailto:w...@chartbeat.com]
> > > Sent: Tuesday, May 03, 2016 10:51 AM
> > > To: users@kafka.apache.org
> > > Subject: Re: Hash partition of key with skew
> > >
> > > I’m not the OP, but in our case, we sometimes want data locality.
> > > For example, suppose that we have 100 consumers that are building up
> > > a cache of customer -> data mapping. If customer data is spread
> > > randomly across all partitions then a query for that customer’s data
> > > would have to hit all 100 consumers. If customer data exhibits some
> > > locality, then queries for that data only hit a subset of consumers.
> > >
> > > Wes
> > >
> > >
> > > > On May 3, 2016, at 11:18 AM, Tauzell, Dave
> > > > <dave.tauz...@surescripts.com>
> > > wrote:
> > > >
> > > > Do you need the messages to be ordered in some way?   Why pass a key
> if
> > > you don't want all the messages to go to one partition?
> > > >
> > > > -Dave
> > > >
> > > > Dave Tauzell | Senior Software Engineer | Surescripts
> > > > O: 651.855.3042 | www.surescripts.com
> > > > <http://www.surescripts.com/>
> > > > |
> > > dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com>
> > > > Connect with us: Twitter I LinkedIn I Facebook I YouTube
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Wesley Chow [mailto:w...@chartbeat.com
> > > > <mailto:w...@chartbeat.com>]
> > > > Sent: Tuesday, May 03, 2016 9:51 AM
> > > > To: users@kafka.apache.org <mailto:users@kafka.apache.org>
> > > > Subject: Re: Hash partition of key with skew
> > > >
> > > > I’ve come up with a couple solutions since we too have a power law
> > > distribution. However, we have not put anything into practice.
> > > >
> > > > Fixed Slicing
> > > >
> > > > One simple thing to do is to take each key and slice it into some
> > > > fixed
> > > number of partitions. So your function might be:
> > > >
> > > > (hash(key) % num) + (hash(key) % 10)
> > > >
> > > > In order to distribute it across 10 partitions. Or:
> > > >
> > > > hash(key + ‘0’) % num
> > > > hash(key + ‘1’) % num
> > > > …
> > > > hash(key + ‘9’) % num
> > > >
> > > >
> > > > Hyperspace Hashing
> > > >
> > > > If your data is multi-dimensional, then you might find hyperspace
> > > hashing useful. I’ll give a simple example, but it’s easy to
> generalize.
> > > Suppose that you have two dimensions you’d like to partition on:
> > > customer id (C) and city location (L). You’d like to be able to
> > > subscribe to all data for some subset of customers, and you’d also
> > > like to be able to subscribe to all data for some subset of locations.
> > > Suppose that this data goes into a topic with 256 partitions.
> > > >
> > > > For any piece of data, you’d construct the partition it goes to
> > > > like
> > so:
> > > >
> > > > ((hash(C) % 16) << 4) + ((hash(L) % 16)
> > > >
> > > > What that is basically saying is take C and map them to 16
> > > > different
> > > spots, and set it as the high 4 bits of an 8 bit int. Then take the
> > > location, map it to 16 different spots, and set it as the lower 4
> > > bits of the int. The resulting number is the partition that piece of
> > > data
> > goes to.
> > > >
> > > > Now if you want one particular C, you subscribe to the 16
> > > > partitions
> > > that contain that C. If you want some particular L, you subscribe to
> > > the 16 partitions that contain that L.
> > > >
> > > > You can extend this scheme to an arbitrary number of dimensions
> > > > subject
> > > to the number of partitions in the topic, and you can vary the
> > > number of bits that any particular dimension takes. This scheme
> > > suffers from a combinatorial explosion of partitions if you really
> > > want to query on lots of different dimensions, but you can see the
> > > Hyperdex paper for clues on how to deal with this.
> > > >
> > > >
> > > > Unbalanced Hashing
> > > >
> > > > It’s easy to generate ok but not great hash functions. One is DJB
> > > > hash,
> > > which relies on two empirically determined constants:
> > > >
> > > > http://stackoverflow.com/questions/10696223/reason-for-5381-number
> > > > -i
> > > > n-
> > > > djb-hash-function
> > > > <http://stackoverflow.com/questions/10696223/reason-for-5381-numbe
> > > > r-
> > > > in
> > > > -djb-hash-function><http://stackoverflow.com/questions/10696223/re
> > > > -djb-hash-function>as
> > > > -djb-hash-function>on
> > > > -for-5381-number-in-djb-hash-function
> > > > <http://stackoverflow.com/questions/10696223/reason-for-5381-numbe
> > > > r-
> > > > in
> > > > -djb-hash-function>>
> > > >
> > > > (5381 and 33 in the above example)
> > > >
> > > > If you can do offline analysis, and your distribution doesn’t
> > > > change
> > > over time, then you can basically exhaustively search for two values
> > > that produce a hash function that better distributes the load.
> > > >
> > > >
> > > > Greedy Knapsack
> > > >
> > > > But if you’re ok doing offline analysis and generating your own
> > > > hash
> > > function, then you can create one that’s simply a hard coded list of
> > > mappings for the heaviest keys, and then defaults to a regular hash
> > > for the rest. The easiest way to programmatically do this is to use
> > > a greedy
> > > algorithm:
> > > >
> > > >  for each heavy key, k:
> > > >    assign k to the partition with the least assigned weight
> > > >
> > > >
> > > > The advantage to fixed slicing and hyperspace hashing is that you
> > > > don’t
> > > have to know your distribution a priori, and it generally scales
> > > well as you increase the number of keys. The disadvantage is that
> > > one key’s data is split across multiple partitions.
> > > >
> > > > The advantage to unbalanced hashing and greedy knapsack is that
> > > > you can
> > > get close to an optimal partitioning scheme and all of one key
> > > resides in one partition. The downside is that you need to do
> > > partition mapping management as your distribution changes over time.
> > > >
> > > > Hopefully that gives you some ideas!
> > > >
> > > > Wes
> > > >
> > > >
> > > >
> > > >> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se>
> wrote:
> > > >>
> > > >> Hi,
> > > >>
> > > >> Not sure if this helps, but the way Loggly seem to do it is to
> > > >> have a separate topic for "noisy neighbors". See [1].
> > > >>
> > > >> [1]
> > > >> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreak
> > > >> ab
> > > >> le
> > > >> -
> > > >> messaging-better-log-management/
> > > >>
> > > >> Cheers,
> > > >> Jens
> > > >>
> > > >> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com>
> > wrote:
> > > >>
> > > >>> Hello,
> > > >>>
> > > >>> Is there a recommendation for handling producer side
> > > >>> partitioning based on a key with skew?
> > > >>> We want to partition on something like clientId. Problem is,
> > > >>> this key has an uniform distribution.
> > > >>> Its equally likely to see a key with 3k occurrence/day vs
> > > >>> 100k/day vs 65million/day.
> > > >>> Cardinality of key is around 1500 and there are approx 1 billion
> > > >>> records per day.
> > > >>> Partitioning by hashcode(key)%numOfPartition will create a few
> > > >>> "hot partitions" and cause a few brokers(and consumer threads)
> > > >>> to be
> > > overloaded.
> > > >>> May be these partitions with heavy load are evenly distributed
> > > >>> among brokers, may be they are not.
> > > >>>
> > > >>> I read KIP-22
> > > >>> <
> > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expos
> > > >>> e+
> > > >>> a+
> > > >>> P
> > > >>> artitioner+interface+in+the+new+producer
> > > >>>>
> > > >>> that
> > > >>> explains how one could write a custom partitioner.
> > > >>> I'd like to know how it was used to solve such data skew.
> > > >>> We can compute some statistics on key distribution offline and
> > > >>> use it in the partitioner.
> > > >>> Is that a good idea? Or is it way too much logic for a partitioner?
> > > >>> Anything else to consider?
> > > >>> Any thoughts or reference will be helpful.
> > > >>>
> > > >>> Thanks,
> > > >>> Srikanth
> > > >>>
> > > >> --
> > > >>
> > > >> Jens Rantil
> > > >> Backend Developer @ Tink
> > > >>
> > > >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden For urgent
> > > >> matters you can reach me at +46-708-84 18 32.
> > > >
> > > > This e-mail and any files transmitted with it are confidential,
> > > > may
> > > contain sensitive information, and are intended solely for the use
> > > of the individual or entity to whom they are addressed. If you have
> > > received this e-mail in error, please notify the sender by reply
> > > e-mail immediately and destroy all copies of the e-mail and any
> > attachments.
> > >
> > >
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of
> > the individual or entity to whom they are addressed. If you have
> > received this e-mail in error, please notify the sender by reply
> > e-mail immediately and destroy all copies of the e-mail and any
> attachments.
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>

Reply via email to