chickenchickenlove commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3233794627
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -533,6 +533,18 @@ private void onSuccessResponse(final
StreamsGroupHeartbeatResponse response, fin
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
+ if (data.rackAwareAssignmentTags() != null) {
+ Set<String> clientTagKeys =
streamsRebalanceData.clientTags().keySet();
+ for (String requiredTag : data.rackAwareAssignmentTags()) {
+ if (!clientTagKeys.contains(requiredTag)) {
+ logger.warn("Broker requires client tag '{}' for
rack-aware standby assignment, " +
+ "but this client does not have it configured. " +
+ "Configure it via 'client.tag.{}' in your Streams
config.",
+ requiredTag, requiredTag);
+ }
Review Comment:
Ah, Thanks for your comments!
I was also wondering how assignment is expected to behave if the broker
receives an invalid rack tag from a client. For example, I could imagine a few
possible scenarios:
- A dynamic member joins with an invalid rack tag
- A static member rejoins with a rack tag that has changed to an invalid
value
I suspect this may be handled in a different PR rather than this one, but I
was curious about the expected behavior 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]