gabriellefu commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3203340550
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -533,6 +533,18 @@ private void onSuccessResponse(final
StreamsGroupHeartbeatResponse response, fin
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
+ if (data.rackAwareAssignmentTags() != null) {
+ Set<String> clientTagKeys =
streamsRebalanceData.clientTags().keySet();
+ for (String requiredTag : data.rackAwareAssignmentTags()) {
+ if (!clientTagKeys.contains(requiredTag)) {
+ logger.warn("Broker requires client tag '{}' for
rack-aware standby assignment, " +
+ "but this client does not have it configured. " +
+ "Configure it via 'client.tag.{}' in your Streams
config.",
+ requiredTag, requiredTag);
+ }
Review Comment:
I think if the broker return a invalid tag, the client will just log a
warning like "you should add this tag" but it won't reject the server and since
this tag won't be the intersection, it won't change the assignment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]