Hi Ewen,

Thanks for detailed explanation. So its basically the version of
MemberAssignment structure, not the version of the assignment strategy as I
thought. Should I use version=0 in protocol exchanges for now? (I'm
building a client in Node.js for 0.9:
https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js )

On Thu, 24 Dec 2015 at 10:15 Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> Oleksiy,
>
> The join group protocol is general enough to handle multiple types of group
> membership, not just consumers. This is used in Kafka Connect to form a
> group of workers (which, instead of splitting topic partitions between
> members splits connector tasks).
>
> In order to make this work and allow flexibility in how assignment is
> handled, the protocol is divided into two layers. The primary join group
> protocol only a) keeps track of group membership and b) selects a group
> protocol that all members agree they can work with. At this level, there's
> no version information, no info about consumer subscriptions, and no
> knowledge of partition assignment strategies other than the names and
> opaque metadata submitted by clients.
>
> The "embedded" layer is where the version info you're setting is specified.
> This is never even parsed by the brokers -- the information is collected
> and sent to one of the group members which then decodes it and determines
> the assignment info. That result is then returned to the broker which
> disseminates the information (and again, the broker never decodes this, it
> just forwards the appropriate info to each member).
>
> The version is included specifically in the consumer protocol to allow us
> to extend the format in the future. For example, if we needed to add or
> change the way subscriptions are expressed, we could increase that version
> number and update the message format. In other words, it is the mechanism
> we have chosen *only for the consumer embedded protocol* to allow metadata
> format changes. (Note that for the consumer embedded protocol there is also
> *yet another* layer of data, called "UserData" in that protocol
> documentation; this is custom data the partition assignment strategy in the
> consumer, which is pluggable, might want include, e.g. if you were doing
> resource-based assignment you might need to include info like # of cpus,
> which is specific to that assignment strategy).
>
> The broker only looks at the ProtocolName (which is equivalent to
> AssignmentStrategy for consumers) when choosing which protocol to use for
> consumers. If you want to version those in an incompatible way (i.e. you
> can't handle the change just by updating the format of your metadata), you
> should include version info in the ProtocolName itself to ensure the group
> coordinator broker can differentiate them, e.g. round-robin vs
> round-robin-2. But you should also think carefully about whether that
> change is necessary -- in many cases if you're not adding any metadata
> you'll be fine just keeping the same name since one member is selected to
> perform the assignment and every other member just needs to respect
> whatever assignment it makes. And of course if you're just trying to switch
> to a completely different assignment strategy (e.g. from range ->
> round-robin), then the name itself is enough. Just bounce all consumers
> adding round-robin as an option, then bounce them all removing range.
>
> We considered other options when designing this protocol, but decided this
> was the best tradeoff. The current protocol is already pretty complex and
> multi-layered and the alternatives that tried to build in versioning at
> this level too were even more complex and confusing.
>
> -Ewen
>
>
>
> On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <oleks...@gmail.com>
> wrote:
>
> > Hi Ewen,
> >
> > I specify version in ProtocolMetadata structure, as per this document:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
> >
> > ---------------
> > ProtocolType => "consumer"
> >
> > ProtocolName => AssignmentStrategy
> >   AssignmentStrategy => string
> >
> > ProtocolMetadata => Version Subscription UserData
> >   Version => int16
> >   Subscription => [Topic]
> >     Topic => string
> >   UserData => bytes
> > -----------------
> >
> > Maybe I misunderstood the purpose of this version field?
> >
> > On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <e...@confluent.io>
> > wrote:
> >
> > > Oleksiy,
> > >
> > > Where are you specifying the version? Unless I'm missing something, the
> > > JoinGroup protocol doesn't include versions so I'm not sure I
> understand
> > > the examples you are giving. Are the version numbers included in the
> > > per-protocol metadata?
> > >
> > > You can see exactly how the consumer coordinator on the broker selects
> > the
> > > protocol here:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> > > It is just taking the candidate protocols (ones that are available for
> > all
> > > consumers), then has each consumer "vote" by selecting whichever
> > candidate
> > > appears in its list of strategies first, then uses the one with the
> most
> > > votes.
> > >
> > > Is it possible your example is behaving the way it is because it
> actually
> > > has duplicates for "strategyX", and in the last case it chooses the
> first
> > > strategyX despite the conflicting versions?
> > >
> > > -Ewen
> > >
> > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <oleks...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I can't understand how the protocol upgrades (to newer version)
> should
> > > > work. When I send GroupJoinRequest with a list of assignment
> protocols
> > > > (same protocol name, different versions) always the first
> > > protocol/version
> > > > gets picked up as a member version. Even if all consumers in the
> group
> > > are
> > > > configured with two versions still always the first specified version
> > > will
> > > > be selected by coordinator and not the one with highest version
> number.
> > > >
> > > > So for example:
> > > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version:
> > 1}
> > > ]
> > > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version:
> > 1}
> > > ]
> > > >
> > > > Both will be assigned a version 0 in a response to leader. If I make
> it
> > > > this way:
> > > >
> > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version:
> > 0}
> > > ]
> > > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version:
> > 0}
> > > ]
> > > >
> > > > Both will be assigned version 1.
> > > >
> > > > In this case:
> > > >
> > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX,
> version:
> > > 1} ]
> > > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX,
> version:
> > > 1} ]
> > > >
> > > > Kafka will endlessly try to rebalance the group without success
> because
> > > > consumer1 will have version:10 and consumer2 - version:20 in a
> > > > GroupJoinResponse.
> > > >
> > > > Can anyone please explain the process of the protocol version
> upgrade?
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to