Jens, Sorry, I'm very late to this thread but figure it might be worth following up since I think this is a cool feature of the new consumer but isn't well known. You actually have *quite* a bit of flexibility in controlling how partition assignment happens. The key hook is the partition.assignment.strategy setting that allows you to plugin alternative assignment strategies.
The key to this are two decisions made when designing the consumer protocol: 1. The broker is not really involved in the assignment process beyond coordinating all the members and selecting one of them to perform assignment. In fact, it doesn't even know *what* is being assigned. The protocol was originally designed specifically for consumers, but was generalized before the first release. This means all the broker sees is the group member it selects to perform assignment sending byte[] data to each of the members. That data is opaque to the broker, but it contains the partition assignment for each member when used for consumers. The cool thing about abstracting this out is that it also made it possible to use it for something largely unrelated to consumer group partition assignment: Kafka Connect uses this to assign connectors/tasks to workers. These two problems look roughly similar (keep track of group members and tell them what work they should be doing), but the details at the protocol level look different (e.g., assigning topic-partitions vs assigning connectors & tasks). 2. Within the format used by the consumer groups, we allow for pluggable user data that can be passed to the assignment process, allowing you to include & process custom data in the assignment process. This is included in the PartitionAssignor.Subscription object, and data specific to the assignment strategy is returned in the PartitionAssigner.Assignment object. The flexibility here may be obscured a bit even if you're looking at the source code for commonly used PartitionAssignors like RoundRobinAssignor ( https://github.com/apache/kafka/blob/trunk/clients/src/ main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java) because they happen to share a common base class that hides some of the complexity (which is good -- for the vast majority of simple assignors, the same basic state is required and we avoid a bunch of code duplication). However, if you implement an alternative to AbstractPartitionAssignor or override some of its methods, you could include extra data. In your case, you might be able to include lag information so that it could be taken into account during assignment. I think there are 2 gaps in making this work really well in practice atm. First, this approach works well as long as consumer lag for each partition is available based on the members for the generation being assigned. This is true when new members are added, but not true when they leave. The second gap is that you don't have an easy way to trigger reassignment other than leave + rejoin, but presumably in this assignor you want to actively monitor the lag and perform reassignment if you hit some trigger level of lag where reassignment could possibly help. (As an alternative, you might be able to have the Assignor directly check consumer lag for each partition while it's doing assignment. I'm not sure that's possible with the current data exposed to it and obviously you incur an extra network round trip for that request.) I think looking at more assignment strategies and trying to map out what else might need to be exposed to PartitionAssignors to support a wider variety of assignment strategies would be worthwhile. It's also worth pointing out that Kafka Streams also leverages this functionality in StreamsPartitionAssignor (I think primarily to handle co-partitioning for joins, but someone more familiar with streams can correct me if I'm missing something else). We tend to see more uses of this functionality at the level of frameworks that need careful customization like this (i.e. Connect & Streams), but certainly regular consumers could use it as well. -Ewen On Mon, Sep 12, 2016 at 1:16 AM, Jens Rantil <jens.ran...@tink.se> wrote: > Hi Stevo, > > Thank you for your response. Yes, I understand there can only be one active > consumer per partition and I understand that partitions should be spread > evenly. However, there will almost always be cases when they are somewhat > unbalanced. > > That said, how consumers are distributed among partitions is not by design. > Let me clarify: My questions was whether there has been any discussion on > 1) having the rebalancing algorithm taking lag into account to assign > partitions to consumers in such way that it tries to spread sum of lag per > consumer as evenly as possible and 2) possibly triggering rebalancing > algorithm if lag could be considerably improved. > > I have two specific use cases: > > 1. I recently added new partitions to a topic and reset offsets to > beginning. This meant that the old partitions had much more data than > the > new partitions. The rebalance when adding new nodes was definitely not > "fair" in terms of lag. I had to scale up to have equal number of > consumers > as number of partitions to be sure that a single consumers was not > assigned > two of the old partitions. > 2. Autoscaling. When consumers come and go based on CPU load or > whatever, they will invariably have different numbers of partitions > assigned to them. Lag aware rebalancer would definitely be a small > optimisation that could do quite a lot here. > > If no discussion of this has been done, I'll consider writing a KIP for it. > > Cheers, > Jens > > On Mon, Sep 12, 2016 at 12:25 AM Stevo Slavić <ssla...@gmail.com> wrote: > > > Hello Jens, > > > > By design there can be only one active consumer per consumer group per > > partion at a time, only one thread after being assigned (acquiring a > lock) > > moves the offset for consumer group for partition, so no concurrency > > problems. Typically that assignment lasts long until rebalancing gets > > triggered e.g. because consumer instance joined or left the same group. > > > > To have all active consumers evenly loaded, one has to have publishing > > evenly distribute messages across the partitions with appropriate > > partitioning strategy. Check which one is being used with which settings. > > It may be tuned well already but maybe not for your test e.g. publishes > 20k > > messages to one partition before publishing to next one (assumes lots of > > messages will be published so batches writes, trading of temp uneven > > balancing for better throughput), so if test publishes 40k messages only, > > only two partitions will actually get the data. > > > > Kind regards, > > Stevo Slavic. > > > > On Sun, Sep 11, 2016, 22:49 Jens Rantil <jens.ran...@tink.se> wrote: > > > > > Hi, > > > > > > We have a partition which has many more messages than all other > > partitions. > > > Does anyone know if there has been any discussions on having a > partition > > > balancer that tries to balance consumers based on consumer group lag? > > > > > > Example: > > > > > > [ > > > { partition: 0, consumers: "192.168.1.2", lag: 20000 }, > > > { partition: 1, consumers: "192.168.1.2", lag: 20000 }, > > > { partition: 2, consumers: "192.168.1.3", lag: 0 }, > > > ] > > > > > > Clearly, it would be more optimial if "192.168.1.3" also takes care of > > > partition 1. > > > > > > Cheers, > > > Jens > > > > > > > > > -- > > > Jens Rantil > > > Backend engineer > > > Tink AB > > > > > > Email: jens.ran...@tink.se > > > Phone: +46 708 84 18 32 > > > Web: www.tink.se > > > > > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin > > > < > > > > > http://www.linkedin.com/company/2735919?trk=vsrp_companies_ > res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVS > RPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > > > > > > Twitter <https://twitter.com/tink> > > > > > > -- > > Jens Rantil > Backend Developer @ Tink > > Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden > For urgent matters you can reach me at +46-708-84 18 32. > -- Thanks, Ewen