[ https://issues.apache.org/jira/browse/KAFKA-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16940322#comment-16940322 ]
ASF GitHub Bot commented on KAFKA-8649: --------------------------------------- ableegoldman commented on pull request #7413: KAFKA-8649: version probing with upgraded leader URL: https://github.com/apache/kafka/pull/7413 Version probing is currently broken when the leader is chosen from one of the new (already upgraded) instances, as older members will blindly upgrade to a version they don't support (then throw an exception) while newer members will receive an assignment with the older version and trigger a new rebalance (leading to rebalance loop if the older members can't upgrade their subscription) because we always send assignments encoded using the min version seen by any client--see ticket for details. Note that a real "version probing" rebalance is technically one where the leader is old and receives subscriptions it can't understand. In this case we send an assignment back with the old version, which the receiving consumer then knows to downgrade to and trigger another rebalance. When you have a "new" leader however, I propose we: - always send assignments back using the same version as the corresponding subscription, EXCEPT if we notice that everyone now supports the latest version but some are still using the older version. this signals the rolling upgrade is complete, so send everyone back the latest version - if you receive a version greater than the one you sent, it must mean the bounce is over and it is safe to now send new versions. upgrade your subscription version and trigger a final rebalance. this will actually only happen when the leader is the last to be bounced - if the leader is new, it can understand all subscriptions so we just wait for everyone to be bounced and allow new members to keep sending new version subscriptions. once the last member is upgraded everyone will already be using the new subscription and we don't need to trigger a second and final rebalance. - if you receive a version less than what you sent, this is version probing so downgrade your subscription and trigger another rebalance -- this will now only happen when you are actually on a higher version than the leader, so we know this is true version probing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Error while rolling update from Kafka Streams 2.0.0 -> Kafka Streams 2.1.0 > -------------------------------------------------------------------------- > > Key: KAFKA-8649 > URL: https://issues.apache.org/jira/browse/KAFKA-8649 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Suyash Garg > Assignee: Sophie Blee-Goldman > Priority: Critical > Fix For: 2.0.2, 2.1.2, 2.2.2, 2.3.1 > > > While doing a rolling update of a cluster of nodes running Kafka Streams > application, the stream threads in the nodes running the old version of the > library (2.0.0), fail with the following error: > {code:java} > [ERROR] [application-existing-StreamThread-336] > [o.a.k.s.p.internals.StreamThread] - stream-thread > [application-existing-StreamThread-336] Encountered the following error > during processing: > java.lang.IllegalArgumentException: version must be between 1 and 3; was: 4 > #011at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.<init>(SubscriptionInfo.java:67) > #011at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscription(StreamsPartitionAssignor.java:312) > #011at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:176) > #011at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:515) > #011at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:466) > #011at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:412) > #011at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) > #011at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) > #011at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) > #011at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > #011at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) > #011at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) > #011at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:861) > #011at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:814) > #011at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) > #011at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)