chickenchickenlove commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3321147169


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8961,9 +8983,12 @@ private Map<String, String> 
streamsGroupAssignmentConfigs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
         final Integer numStandbyReplicas = 
groupConfig.flatMap(GroupConfig::streamsNumStandbyReplicas)
             .orElse(config.streamsGroupNumStandbyReplicas());
-        return new TreeMap<>(Map.of(
-            "num.standby.replicas", numStandbyReplicas.toString()
-        ));
+        final List<String> rackAwareAssignmentTags = 
groupConfig.flatMap(GroupConfig::streamsRackAwareAssignmentTags)
+            .orElse(config.streamsGroupRackAwareAssignmentTags());
+        Map<String, String> configs = new TreeMap<>();
+        configs.put("num.standby.replicas", numStandbyReplicas.toString());
+        configs.put("rack.aware.assignment.tags", String.join(",", 
rackAwareAssignmentTags));
+        return configs;

Review Comment:
   I think this change can cause unexpected epoch bump even if there is no 
`rack.aware.assignment.tags` changes (GroupMetadataManager L2076) during broker 
rolling upgrade. 
   
   For example, 
   - rolling upgrade before : There is no `rack.aware.assignment.tags`. -> 
`{"num.standby.replicas":"0"}`
   - rolling upgrade with this PR : 
`{"num.standby.replicas":"0","rack.aware.assignment.tags":""}`.
   
   So, group epoch is bumped up.
   Given KIP-1071, this is undesirable side effect. 
   
   What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2189,6 +2190,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
                 )
         ));
 
+        String rackAwareTagsValue = 
currentAssignmentConfigs.getOrDefault("rack.aware.assignment.tags", "");
+        if (!rackAwareTagsValue.isEmpty()) {
+            List<String> requiredTags = 
Arrays.stream(rackAwareTagsValue.split(",", -1))
+                .filter(tag -> !tag.isEmpty())
+                .collect(Collectors.toList());
+            Set<String> memberTagKeys = updatedMember.clientTags().keySet();
+            List<String> missingTags = requiredTags.stream()
+                .filter(tag -> !memberTagKeys.contains(tag))
+                .collect(Collectors.toList());
+            if (!missingTags.isEmpty()) {
+                returnedStatus.add(
+                    new Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code())
+                        .setStatusDetail(
+                            String.format("Missing required client tags for 
rack-aware standby assignment: %s. " +
+                                "Configure them via 'client.tag.<tagKey>' in 
your Streams config.", missingTags)
+                        )
+                );
+            }
+        }
+

Review Comment:
   Should we prevent invalid values from being stored in the member metadata?
   
   Currently, it seems that invalid data can be stored in the member metadata 
and later passed to the assignor. For example, if a value like zone,,cluster is 
provided, it appears to be passed to the assignor as-is.
   
   Would it be possible to prevent invalid values from being propagated to 
other instances through GroupMetadataManager?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to