mjsax commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3353818484


##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json:
##########
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "StreamsGroupHeartbeatRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   The version was bumped in some other PR in the mean time, so we don't need 
to do this any longer. That's also why there are merge conflicts. Rebasing to 
`trunk` will fix this issues.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -541,10 +543,21 @@ private void onSuccessResponse(final 
StreamsGroupHeartbeatResponse response, fin
         if (statuses != null) {
             streamsRebalanceData.setStatuses(statuses);
             if (!statuses.isEmpty()) {
+                for (StreamsGroupHeartbeatResponseData.Status status : 
statuses) {

Review Comment:
   Seems we are looping over `statuses` twice now. In this new for-loop and in 
the below `statuses.streams()...` code.
   
   Might be good to unify the code to only loop onec?



##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -17,7 +17,7 @@
   "apiKey": 88,
   "type": "response",
   "name": "StreamsGroupHeartbeatResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   same



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -693,7 +699,6 @@ public GroupCoordinatorConfig(AbstractConfig config) {
             String.format("%s must be less than or equal to %s",
                 SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
 
-

Review Comment:
   I see -- well, instead of doing this, let's split out a helper method 
`verifyConfigs()` (or maybe even multiple, one for each "group of configs", 
like `verifyShareGroupConfigs()`, `verifyStreamsGroupConfigs()` and similar), 
to make the method really shorter.



##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -52,7 +52,6 @@
       "about": "The maximal lag a warm-up task can have to be considered 
caught-up." },
     { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
       "about": "The interval in which the task changelog offsets on a client 
are updated on the broker. The offsets are sent with the next heartbeat after 
this time has passed." },
-

Review Comment:
   nit: avoid unnecessary reformatting



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2189,6 +2190,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
                 )
         ));
 
+        String rackAwareTagsValue = 
currentAssignmentConfigs.getOrDefault("rack.aware.assignment.tags", "");
+        if (!rackAwareTagsValue.isEmpty()) {
+            List<String> requiredTags = 
Arrays.stream(rackAwareTagsValue.split(",", -1))
+                .filter(tag -> !tag.isEmpty())
+                .collect(Collectors.toList());
+            Set<String> memberTagKeys = updatedMember.clientTags().keySet();
+            List<String> missingTags = requiredTags.stream()
+                .filter(tag -> !memberTagKeys.contains(tag))
+                .collect(Collectors.toList());
+            if (!missingTags.isEmpty()) {
+                returnedStatus.add(
+                    new Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code())
+                        .setStatusDetail(
+                            String.format("Missing required client tags for 
rack-aware standby assignment: %s. " +
+                                "Configure them via 'client.tag.<tagKey>' in 
your Streams config.", missingTags)
+                        )
+                );
+            }
+        }
+

Review Comment:
   Not sure. I am wondering if this is something the assignor needs to decide 
how to handle?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8961,9 +8983,12 @@ private Map<String, String> 
streamsGroupAssignmentConfigs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
         final Integer numStandbyReplicas = 
groupConfig.flatMap(GroupConfig::streamsNumStandbyReplicas)
             .orElse(config.streamsGroupNumStandbyReplicas());
-        return new TreeMap<>(Map.of(
-            "num.standby.replicas", numStandbyReplicas.toString()
-        ));
+        final List<String> rackAwareAssignmentTags = 
groupConfig.flatMap(GroupConfig::streamsRackAwareAssignmentTags)
+            .orElse(config.streamsGroupRackAwareAssignmentTags());
+        Map<String, String> configs = new TreeMap<>();
+        configs.put("num.standby.replicas", numStandbyReplicas.toString());
+        configs.put("rack.aware.assignment.tags", String.join(",", 
rackAwareAssignmentTags));
+        return configs;

Review Comment:
   This is a valid point. Guess we have the same problem on 
https://github.com/apache/kafka/pull/21799
   
   While we could maybe avoid it here (by not adding the config if it's the 
default, ie, empty string), not sure how to handle the case of PR 21799? And if 
we cannot fix it over there, it doesn't matter what we do here either.
   
   Even if I am wondering about `acceptable.recovery.lag` now -- it is actually 
an "assignment configuration" we need to add here? Maybe we need to update PR 
21799 to begin with, allowing us to change the code here to only add if 
non-empty string. \cc @lucasbru 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to