[
https://issues.apache.org/jira/browse/KAFKA-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16940322#comment-16940322
]
ASF GitHub Bot commented on KAFKA-8649:
---------------------------------------
ableegoldman commented on pull request #7413: KAFKA-8649: version probing with
upgraded leader
URL: https://github.com/apache/kafka/pull/7413
Version probing is currently broken when the leader is chosen from one of
the new (already upgraded) instances, as older members will blindly upgrade to
a version they don't support (then throw an exception) while newer members will
receive an assignment with the older version and trigger a new rebalance
(leading to rebalance loop if the older members can't upgrade their
subscription) because we always send assignments encoded using the min version
seen by any client--see ticket for details.
Note that a real "version probing" rebalance is technically one where the
leader is old and receives subscriptions it can't understand. In this case we
send an assignment back with the old version, which the receiving consumer then
knows to downgrade to and trigger another rebalance.
When you have a "new" leader however, I propose we:
- always send assignments back using the same version as the corresponding
subscription, EXCEPT if we notice that everyone now supports the latest version
but some are still using the older version. this signals the rolling upgrade is
complete, so send everyone back the latest version
- if you receive a version greater than the one you sent, it must mean the
bounce is over and it is safe to now send new versions. upgrade your
subscription version and trigger a final rebalance. this will actually only
happen when the leader is the last to be bounced
- if the leader is new, it can understand all subscriptions so we just wait
for everyone to be bounced and allow new members to keep sending new version
subscriptions. once the last member is upgraded everyone will already be using
the new subscription and we don't need to trigger a second and final rebalance.
- if you receive a version less than what you sent, this is version probing
so downgrade your subscription and trigger another rebalance -- this will now
only happen when you are actually on a higher version than the leader, so we
know this is true version probing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Error while rolling update from Kafka Streams 2.0.0 -> Kafka Streams 2.1.0
> --------------------------------------------------------------------------
>
> Key: KAFKA-8649
> URL: https://issues.apache.org/jira/browse/KAFKA-8649
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.0.0
> Reporter: Suyash Garg
> Assignee: Sophie Blee-Goldman
> Priority: Critical
> Fix For: 2.0.2, 2.1.2, 2.2.2, 2.3.1
>
>
> While doing a rolling update of a cluster of nodes running Kafka Streams
> application, the stream threads in the nodes running the old version of the
> library (2.0.0), fail with the following error:
> {code:java}
> [ERROR] [application-existing-StreamThread-336]
> [o.a.k.s.p.internals.StreamThread] - stream-thread
> [application-existing-StreamThread-336] Encountered the following error
> during processing:
> java.lang.IllegalArgumentException: version must be between 1 and 3; was: 4
> #011at
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.<init>(SubscriptionInfo.java:67)
> #011at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscription(StreamsPartitionAssignor.java:312)
> #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:176)
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:515)
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:466)
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:412)
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
> #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:861)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:814)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)