Jens,

Sorry, I'm very late to this thread but figure it might be worth following
up since I think this is a cool feature of the new consumer but isn't well
known. You actually have *quite* a bit of flexibility in controlling how
partition assignment happens. The key hook is the
partition.assignment.strategy setting that allows you to plugin alternative
assignment strategies.

The key to this are two decisions made when designing the consumer protocol:

1. The broker is not really involved in the assignment process beyond
coordinating all the members and selecting one of them to perform
assignment. In fact, it doesn't even know *what* is being assigned. The
protocol was originally designed specifically for consumers, but was
generalized before the first release. This means all the broker sees is the
group member it selects to perform assignment sending byte[] data to each
of the members. That data is opaque to the broker, but it contains the
partition assignment for each member when used for consumers. The cool
thing about abstracting this out is that it also made it possible to use it
for something largely unrelated to consumer group partition assignment:
Kafka Connect uses this to assign connectors/tasks to workers. These two
problems look roughly similar (keep track of group members and tell them
what work they should be doing), but the details at the protocol level look
different (e.g., assigning topic-partitions vs assigning connectors &
tasks).

2. Within the format used by the consumer groups, we allow for pluggable
user data that can be passed to the assignment process, allowing you to
include & process custom data in the assignment process. This is included
in the PartitionAssignor.Subscription object, and data specific to the
assignment strategy is returned in the PartitionAssigner.Assignment object.

The flexibility here may be obscured a bit even if you're looking at the
source code for commonly used PartitionAssignors like RoundRobinAssignor (
https://github.com/apache/kafka/blob/trunk/clients/src/
main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java)
because they happen to share a common base class that hides some of the
complexity (which is good -- for the vast majority of simple assignors, the
same basic state is required and we avoid a bunch of code duplication).
However, if you implement an alternative to AbstractPartitionAssignor or
override some of its methods, you could include extra data.

In your case, you might be able to include lag information so that it could
be taken into account during assignment.

I think there are 2 gaps in making this work really well in practice atm.
First, this approach works well as long as consumer lag for each partition
is available based on the members for the generation being assigned. This
is true when new members are added, but not true when they leave. The
second gap is that you don't have an easy way to trigger reassignment other
than leave + rejoin, but presumably in this assignor you want to actively
monitor the lag and perform reassignment if you hit some trigger level of
lag where reassignment could possibly help. (As an alternative, you might
be able to have the Assignor directly check consumer lag for each partition
while it's doing assignment. I'm not sure that's possible with the current
data exposed to it and obviously you incur an extra network round trip for
that request.)

I think looking at more assignment strategies and trying to map out what
else might need to be exposed to PartitionAssignors to support a wider
variety of assignment strategies would be worthwhile. It's also worth
pointing out that Kafka Streams also leverages this functionality in
StreamsPartitionAssignor (I think primarily to handle co-partitioning for
joins, but someone more familiar with streams can correct me if I'm missing
something else). We tend to see more uses of this functionality at the
level of frameworks that need careful customization like this (i.e. Connect
& Streams), but certainly regular consumers could use it as well.

-Ewen

On Mon, Sep 12, 2016 at 1:16 AM, Jens Rantil <jens.ran...@tink.se> wrote:

> Hi Stevo,
>
> Thank you for your response. Yes, I understand there can only be one active
> consumer per partition and I understand that partitions should be spread
> evenly. However, there will almost always be cases when they are somewhat
> unbalanced.
>
> That said, how consumers are distributed among partitions is not by design.
> Let me clarify: My questions was whether there has been any discussion on
> 1) having the rebalancing algorithm taking lag into account to assign
> partitions to consumers in such way that it tries to spread sum of lag per
> consumer as evenly as possible and 2) possibly triggering rebalancing
> algorithm if lag could be considerably improved.
>
> I have two specific use cases:
>
>    1. I recently added new partitions to a topic and reset offsets to
>    beginning. This meant that the old partitions had much more data than
> the
>    new partitions. The rebalance when adding new nodes was definitely not
>    "fair" in terms of lag. I had to scale up to have equal number of
> consumers
>    as number of partitions to be sure that a single consumers was not
> assigned
>    two of the old partitions.
>    2. Autoscaling. When consumers come and go based on CPU load or
>    whatever, they will invariably have different numbers of partitions
>    assigned to them. Lag aware rebalancer would definitely be a small
>    optimisation that could do quite a lot here.
>
> If no discussion of this has been done, I'll consider writing a KIP for it.
>
> Cheers,
> Jens
>
> On Mon, Sep 12, 2016 at 12:25 AM Stevo Slavić <ssla...@gmail.com> wrote:
>
> > Hello Jens,
> >
> > By design there can be only one active consumer per consumer group per
> > partion at a time, only one thread after being assigned (acquiring a
> lock)
> > moves the offset for consumer group for partition, so no concurrency
> > problems. Typically that assignment lasts long until rebalancing gets
> > triggered e.g. because consumer instance joined or left the same group.
> >
> > To have all active consumers evenly loaded, one has to have publishing
> > evenly distribute messages across the partitions with appropriate
> > partitioning strategy. Check which one is being used with which settings.
> > It may be tuned well already but maybe not for your test e.g. publishes
> 20k
> > messages to one partition before publishing to next one (assumes lots of
> > messages will be published so batches writes, trading of temp uneven
> > balancing for better throughput), so if test publishes 40k messages only,
> > only two partitions will actually get the data.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Sun, Sep 11, 2016, 22:49 Jens Rantil <jens.ran...@tink.se> wrote:
> >
> > > Hi,
> > >
> > > We have a partition which has many more messages than all other
> > partitions.
> > > Does anyone know if there has been any discussions on having a
> partition
> > > balancer that tries to balance consumers based on consumer group lag?
> > >
> > > Example:
> > >
> > > [
> > >     { partition: 0, consumers: "192.168.1.2", lag: 20000 },
> > >     { partition: 1, consumers: "192.168.1.2", lag: 20000 },
> > >     { partition: 2, consumers: "192.168.1.3", lag: 0 },
> > > ]
> > >
> > > Clearly, it would be more optimial if "192.168.1.3" also takes care of
> > > partition 1.
> > >
> > > Cheers,
> > > Jens
> > >
> > >
> > > --
> > > Jens Rantil
> > > Backend engineer
> > > Tink AB
> > >
> > > Email: jens.ran...@tink.se
> > > Phone: +46 708 84 18 32
> > > Web: www.tink.se
> > >
> > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > > <
> > >
> > http://www.linkedin.com/company/2735919?trk=vsrp_companies_
> res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVS
> RPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > >
> > >  Twitter <https://twitter.com/tink>
> > >
> >
> --
>
> Jens Rantil
> Backend Developer @ Tink
>
> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
> For urgent matters you can reach me at +46-708-84 18 32.
>



-- 
Thanks,
Ewen

Reply via email to