Hi Ewen,

I specify version in ProtocolMetadata structure, as per this document:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse

---------------
ProtocolType => "consumer"

ProtocolName => AssignmentStrategy
  AssignmentStrategy => string

ProtocolMetadata => Version Subscription UserData
  Version => int16
  Subscription => [Topic]
    Topic => string
  UserData => bytes
-----------------

Maybe I misunderstood the purpose of this version field?

On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> Oleksiy,
>
> Where are you specifying the version? Unless I'm missing something, the
> JoinGroup protocol doesn't include versions so I'm not sure I understand
> the examples you are giving. Are the version numbers included in the
> per-protocol metadata?
>
> You can see exactly how the consumer coordinator on the broker selects the
> protocol here:
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> It is just taking the candidate protocols (ones that are available for all
> consumers), then has each consumer "vote" by selecting whichever candidate
> appears in its list of strategies first, then uses the one with the most
> votes.
>
> Is it possible your example is behaving the way it is because it actually
> has duplicates for "strategyX", and in the last case it chooses the first
> strategyX despite the conflicting versions?
>
> -Ewen
>
> On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <oleks...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I can't understand how the protocol upgrades (to newer version) should
> > work. When I send GroupJoinRequest with a list of assignment protocols
> > (same protocol name, different versions) always the first
> protocol/version
> > gets picked up as a member version. Even if all consumers in the group
> are
> > configured with two versions still always the first specified version
> will
> > be selected by coordinator and not the one with highest version number.
> >
> > So for example:
> > consumer1: [ {name:strategyX, version: 0}, {name: strategyX, version: 1}
> ]
> > consumer2: [ {name:strategyX, version: 0}, {name: strategyX, version: 1}
> ]
> >
> > Both will be assigned a version 0 in a response to leader. If I make it
> > this way:
> >
> > consumer1: [ {name:strategyX, version: 1}, {name: strategyX, version: 0}
> ]
> > consumer2: [ {name:strategyX, version: 1}, {name: strategyX, version: 0}
> ]
> >
> > Both will be assigned version 1.
> >
> > In this case:
> >
> > consumer1: [ {name:strategyX, version: 10}, {name: strategyX, version:
> 1} ]
> > consumer2: [ {name:strategyX, version: 20}, {name: strategyX, version:
> 1} ]
> >
> > Kafka will endlessly try to rebalance the group without success because
> > consumer1 will have version:10 and consumer2 - version:20 in a
> > GroupJoinResponse.
> >
> > Can anyone please explain the process of the protocol version upgrade?
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to