Hi Ewen, Thanks for detailed explanation. So its basically the version of MemberAssignment structure, not the version of the assignment strategy as I thought. Should I use version=0 in protocol exchanges for now? (I'm building a client in Node.js for 0.9: https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js )
On Thu, 24 Dec 2015 at 10:15 Ewen Cheslack-Postava <e...@confluent.io> wrote: > Oleksiy, > > The join group protocol is general enough to handle multiple types of group > membership, not just consumers. This is used in Kafka Connect to form a > group of workers (which, instead of splitting topic partitions between > members splits connector tasks). > > In order to make this work and allow flexibility in how assignment is > handled, the protocol is divided into two layers. The primary join group > protocol only a) keeps track of group membership and b) selects a group > protocol that all members agree they can work with. At this level, there's > no version information, no info about consumer subscriptions, and no > knowledge of partition assignment strategies other than the names and > opaque metadata submitted by clients. > > The "embedded" layer is where the version info you're setting is specified. > This is never even parsed by the brokers -- the information is collected > and sent to one of the group members which then decodes it and determines > the assignment info. That result is then returned to the broker which > disseminates the information (and again, the broker never decodes this, it > just forwards the appropriate info to each member). > > The version is included specifically in the consumer protocol to allow us > to extend the format in the future. For example, if we needed to add or > change the way subscriptions are expressed, we could increase that version > number and update the message format. In other words, it is the mechanism > we have chosen *only for the consumer embedded protocol* to allow metadata > format changes. (Note that for the consumer embedded protocol there is also > *yet another* layer of data, called "UserData" in that protocol > documentation; this is custom data the partition assignment strategy in the > consumer, which is pluggable, might want include, e.g. if you were doing > resource-based assignment you might need to include info like # of cpus, > which is specific to that assignment strategy). > > The broker only looks at the ProtocolName (which is equivalent to > AssignmentStrategy for consumers) when choosing which protocol to use for > consumers. If you want to version those in an incompatible way (i.e. you > can't handle the change just by updating the format of your metadata), you > should include version info in the ProtocolName itself to ensure the group > coordinator broker can differentiate them, e.g. round-robin vs > round-robin-2. But you should also think carefully about whether that > change is necessary -- in many cases if you're not adding any metadata > you'll be fine just keeping the same name since one member is selected to > perform the assignment and every other member just needs to respect > whatever assignment it makes. And of course if you're just trying to switch > to a completely different assignment strategy (e.g. from range -> > round-robin), then the name itself is enough. Just bounce all consumers > adding round-robin as an option, then bounce them all removing range. > > We considered other options when designing this protocol, but decided this > was the best tradeoff. The current protocol is already pretty complex and > multi-layered and the alternatives that tried to build in versioning at > this level too were even more complex and confusing. > > -Ewen > > > > On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <oleks...@gmail.com> > wrote: > > > Hi Ewen, > > > > I specify version in ProtocolMetadata structure, as per this document: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse > > > > --------------- > > ProtocolType => "consumer" > > > > ProtocolName => AssignmentStrategy > > AssignmentStrategy => string > > > > ProtocolMetadata => Version Subscription UserData > > Version => int16 > > Subscription => [Topic] > > Topic => string > > UserData => bytes > > ----------------- > > > > Maybe I misunderstood the purpose of this version field? > > > > On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <e...@confluent.io> > > wrote: > > > > > Oleksiy, > > > > > > Where are you specifying the version? Unless I'm missing something, the > > > JoinGroup protocol doesn't include versions so I'm not sure I > understand > > > the examples you are giving. Are the version numbers included in the > > > per-protocol metadata? > > > > > > You can see exactly how the consumer coordinator on the broker selects > > the > > > protocol here: > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179 > > > It is just taking the candidate protocols (ones that are available for > > all > > > consumers), then has each consumer "vote" by selecting whichever > > candidate > > > appears in its list of strategies first, then uses the one with the > most > > > votes. > > > > > > Is it possible your example is behaving the way it is because it > actually > > > has duplicates for "strategyX", and in the last case it chooses the > first > > > strategyX despite the conflicting versions? > > > > > > -Ewen > > > > > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <oleks...@gmail.com > > > > > wrote: > > > > > > > Hi, > > > > > > > > I can't understand how the protocol upgrades (to newer version) > should > > > > work. When I send GroupJoinRequest with a list of assignment > protocols > > > > (same protocol name, different versions) always the first > > > protocol/version > > > > gets picked up as a member version. Even if all consumers in the > group > > > are > > > > configured with two versions still always the first specified version > > > will > > > > be selected by coordinator and not the one with highest version > number. > > > > > > > > So for example: > > > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version: > > 1} > > > ] > > > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version: > > 1} > > > ] > > > > > > > > Both will be assigned a version 0 in a response to leader. If I make > it > > > > this way: > > > > > > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version: > > 0} > > > ] > > > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version: > > 0} > > > ] > > > > > > > > Both will be assigned version 1. > > > > > > > > In this case: > > > > > > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX, > version: > > > 1} ] > > > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX, > version: > > > 1} ] > > > > > > > > Kafka will endlessly try to rebalance the group without success > because > > > > consumer1 will have version:10 and consumer2 - version:20 in a > > > > GroupJoinResponse. > > > > > > > > Can anyone please explain the process of the protocol version > upgrade? > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Ewen > > > > > > > > > -- > Thanks, > Ewen >