[ 
https://issues.apache.org/jira/browse/KAFKA-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16940322#comment-16940322
 ] 

ASF GitHub Bot commented on KAFKA-8649:
---------------------------------------

ableegoldman commented on pull request #7413: KAFKA-8649: version probing with 
upgraded leader
URL: https://github.com/apache/kafka/pull/7413
 
 
   Version probing is currently broken when the leader is chosen from one of 
the new (already upgraded) instances, as older members will blindly upgrade to 
a version they don't support (then throw an exception) while newer members will 
receive an assignment with the older version and trigger a new rebalance 
(leading to rebalance loop if the older members can't upgrade their 
subscription) because we always send assignments encoded using the min version 
seen by any client--see ticket for details.
   
   Note that a real "version probing" rebalance is technically one where the 
leader is old and receives subscriptions it can't understand. In this case we 
send an assignment back with the old version, which the receiving consumer then 
knows to downgrade to and trigger another rebalance. 
   
   When you have a "new" leader however, I propose we:
   
   - always send assignments back using the same version as the corresponding 
subscription, EXCEPT if we notice that everyone now supports the latest version 
but some are still using the older version. this signals the rolling upgrade is 
complete, so send everyone back the latest version
   - if you receive a version greater than the one you sent, it must mean the 
bounce is over and it is safe to now send new versions. upgrade your 
subscription version and trigger a final rebalance. this will actually only 
happen when the leader is the last to be bounced
   - if the leader is new, it can understand all subscriptions so we just wait 
for everyone to be bounced and allow new members to keep sending new version 
subscriptions. once the last member is upgraded everyone will already be using 
the new subscription and we don't need to trigger a second and final rebalance.
   - if you receive a version less than what you sent, this is version probing 
so downgrade your subscription and trigger another rebalance -- this will now 
only happen when you are actually on a higher version than the leader, so we 
know this is true version probing.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Error while rolling update from Kafka Streams 2.0.0 -> Kafka Streams 2.1.0
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-8649
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8649
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Suyash Garg
>            Assignee: Sophie Blee-Goldman
>            Priority: Critical
>             Fix For: 2.0.2, 2.1.2, 2.2.2, 2.3.1
>
>
> While doing a rolling update of a cluster of nodes running Kafka Streams 
> application, the stream threads in the nodes running the old version of the 
> library (2.0.0), fail with the following error: 
> {code:java}
> [ERROR] [application-existing-StreamThread-336] 
> [o.a.k.s.p.internals.StreamThread] - stream-thread 
> [application-existing-StreamThread-336] Encountered the following error 
> during processing:
> java.lang.IllegalArgumentException: version must be between 1 and 3; was: 4
> #011at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.<init>(SubscriptionInfo.java:67)
> #011at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscription(StreamsPartitionAssignor.java:312)
> #011at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:176)
> #011at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:515)
> #011at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:466)
> #011at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:412)
> #011at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
> #011at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
> #011at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
> #011at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
> #011at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
> #011at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> #011at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:861)
> #011at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:814)
> #011at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
> #011at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to