chickenchickenlove commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3201549377


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2140,11 +2141,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         }
 
         // Prepare the response.
+        String rackAwareTagsValue = 
currentAssignmentConfigs.getOrDefault("rack.aware.assignment.tags", "");
+        List<String> rackAwareAssignmentTags = rackAwareTagsValue.isEmpty()
+            ? Collections.emptyList()
+            : Arrays.asList(rackAwareTagsValue.split(",", -1));
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
             .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
+            .setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId))
+            .setRackAwareAssignmentTags(rackAwareAssignmentTags);

Review Comment:
   > The list of client tag keys used for rack-aware standby task assignment. 
Null if unchanged since last heartbeat.
   
   Regarding statement in the `StreamsGroupHeartbeatResponse.json`, 
   If there is no change, should we set `null`?



##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########


Review Comment:
   Old v0 clients don't know about `RackAwareAssignmentTags`. 
   So, IMHO, old v0 clients fail to decode `StreamsGroupHeartbeatResponse`.
   Should we set `validVersions` to `0-1`?



##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -52,6 +52,8 @@
       "about": "The maximal lag a warm-up task can have to be considered 
caught-up." },
     { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
       "about": "The interval in which the task changelog offsets on a client 
are updated on the broker. The offsets are sent with the next heartbeat after 
this time has passed." },
+    { "name": "RackAwareAssignmentTags", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The list of client tag keys used for rack-aware standby task 
assignment. Null if unchanged since last heartbeat." },

Review Comment:
   Regarding 
[KIP-1071](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol),
 
   1. Should we set `versions` to `1+`? 
   2. Should we use `The assignment tags used by the group coordinator for rack 
aware standby task assignment.` for the `about` section?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -292,6 +298,11 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(0),
             MEDIUM,
             GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
+        .define(STREAMS_RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+            LIST,
+            
GroupCoordinatorConfig.STREAMS_GROUP_RACK_AWARE_ASSIGNMENT_TAGS_DEFAULT,
+            MEDIUM,
+            
GroupCoordinatorConfig.STREAMS_GROUP_RACK_AWARE_ASSIGNMENT_TAGS_DOC)

Review Comment:
   Would it make sense to add a Validator here?
   If config is misconfigured as 
`group.streams.rack.aware.assignment.tags=zone,,cluster`, it might be parsed to 
`List.of("zone", "", "cluster)`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -533,6 +533,18 @@ private void onSuccessResponse(final 
StreamsGroupHeartbeatResponse response, fin
         
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
         
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
 
+        if (data.rackAwareAssignmentTags() != null) {
+            Set<String> clientTagKeys = 
streamsRebalanceData.clientTags().keySet();
+            for (String requiredTag : data.rackAwareAssignmentTags()) {
+                if (!clientTagKeys.contains(requiredTag)) {
+                    logger.warn("Broker requires client tag '{}' for 
rack-aware standby assignment, " +
+                        "but this client does not have it configured. " +
+                        "Configure it via 'client.tag.{}' in your Streams 
config.",
+                        requiredTag, requiredTag);
+                }

Review Comment:
   Just out of curiosity, if the server returns an invalid tag, will the 
client-side validation added in another PR reject it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to