Hi Ewen, I specify version in ProtocolMetadata structure, as per this document: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
--------------- ProtocolType => "consumer" ProtocolName => AssignmentStrategy AssignmentStrategy => string ProtocolMetadata => Version Subscription UserData Version => int16 Subscription => [Topic] Topic => string UserData => bytes ----------------- Maybe I misunderstood the purpose of this version field? On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <e...@confluent.io> wrote: > Oleksiy, > > Where are you specifying the version? Unless I'm missing something, the > JoinGroup protocol doesn't include versions so I'm not sure I understand > the examples you are giving. Are the version numbers included in the > per-protocol metadata? > > You can see exactly how the consumer coordinator on the broker selects the > protocol here: > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179 > It is just taking the candidate protocols (ones that are available for all > consumers), then has each consumer "vote" by selecting whichever candidate > appears in its list of strategies first, then uses the one with the most > votes. > > Is it possible your example is behaving the way it is because it actually > has duplicates for "strategyX", and in the last case it chooses the first > strategyX despite the conflicting versions? > > -Ewen > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <oleks...@gmail.com> > wrote: > > > Hi, > > > > I can't understand how the protocol upgrades (to newer version) should > > work. When I send GroupJoinRequest with a list of assignment protocols > > (same protocol name, different versions) always the first > protocol/version > > gets picked up as a member version. Even if all consumers in the group > are > > configured with two versions still always the first specified version > will > > be selected by coordinator and not the one with highest version number. > > > > So for example: > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version: 1} > ] > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version: 1} > ] > > > > Both will be assigned a version 0 in a response to leader. If I make it > > this way: > > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version: 0} > ] > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version: 0} > ] > > > > Both will be assigned version 1. > > > > In this case: > > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX, version: > 1} ] > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX, version: > 1} ] > > > > Kafka will endlessly try to rebalance the group without success because > > consumer1 will have version:10 and consumer2 - version:20 in a > > GroupJoinResponse. > > > > Can anyone please explain the process of the protocol version upgrade? > > > > > > -- > Thanks, > Ewen >