mjsax commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3203826150


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -553,6 +557,7 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupMaxSize;
     private final int streamsGroupNumStandbyReplicas;
     private final int streamsGroupMaxStandbyReplicas;
+    private final List<String> streamsGroupRackAwareAssignmentTags;

Review Comment:
   Doesn't really matter, but why do insert this here, and not at the end of he 
list?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2140,11 +2141,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         }
 
         // Prepare the response.
+        String rackAwareTagsValue = 
currentAssignmentConfigs.getOrDefault("rack.aware.assignment.tags", "");
+        List<String> rackAwareAssignmentTags = rackAwareTagsValue.isEmpty()
+            ? Collections.emptyList()
+            : Arrays.asList(rackAwareTagsValue.split(",", -1));
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
             .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
+            .setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId))
+            .setRackAwareAssignmentTags(rackAwareAssignmentTags);

Review Comment:
   Yes.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -533,6 +533,18 @@ private void onSuccessResponse(final 
StreamsGroupHeartbeatResponse response, fin
         
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
         
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
 
+        if (data.rackAwareAssignmentTags() != null) {
+            Set<String> clientTagKeys = 
streamsRebalanceData.clientTags().keySet();
+            for (String requiredTag : data.rackAwareAssignmentTags()) {
+                if (!clientTagKeys.contains(requiredTag)) {
+                    logger.warn("Broker requires client tag '{}' for 
rack-aware standby assignment, " +
+                        "but this client does not have it configured. " +
+                        "Configure it via 'client.tag.{}' in your Streams 
config.",
+                        requiredTag, requiredTag);
+                }

Review Comment:
   Not sure what you mean by "reject it" @chickenchickenlove ?
   
   `rack.aware.assignment.tags` are the tags configure for the assignor to use, 
to compute the assignment. The client is not using them by itself, so there is 
nothing to be rejected.
   
   The logging we do here is to inform the developer/operator of the client, 
that the broker uses some tags for rack aware assignment, but the client 
doesn't set these tags, to rack aware assignment might not work as expected.



##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########


Review Comment:
   This will be fixed via https://github.com/apache/kafka/pull/21799



##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -52,6 +52,8 @@
       "about": "The maximal lag a warm-up task can have to be considered 
caught-up." },
     { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
       "about": "The interval in which the task changelog offsets on a client 
are updated on the broker. The offsets are sent with the next heartbeat after 
this time has passed." },
+    { "name": "RackAwareAssignmentTags", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The list of client tag keys used for rack-aware standby task 
assignment. Null if unchanged since last heartbeat." },

Review Comment:
   Yes, this need to get changed to `"versions": "1+"`, and 
`"nullableVersions": "1+"`
   
   But this PR depends on https://github.com/apache/kafka/pull/21799 -- do I 
guess it's ok for now and we can fix after the other PR got merged



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -292,6 +298,11 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(0),
             MEDIUM,
             GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
+        .define(STREAMS_RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+            LIST,
+            
GroupCoordinatorConfig.STREAMS_GROUP_RACK_AWARE_ASSIGNMENT_TAGS_DEFAULT,
+            MEDIUM,
+            
GroupCoordinatorConfig.STREAMS_GROUP_RACK_AWARE_ASSIGNMENT_TAGS_DOC)

Review Comment:
   I believe that `LIST` type does take care of this already? We have an 
existing test for the "classic" client side config, 
`StreamsConfigTest#shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithEmptyTag`,
 and it should work the same broker side?
   
   Might be good to add a test for it (maybe both, coordinator-config, and 
group-config)?
   
   What also reminds me, that we might want to also have guard on both configs 
(especially the coordinator one --  for the group one, we can simply reject the 
config change with an error?) to also ensure that the group-coordinator starts 
up, even if `group.streams.rack.aware.assignment.tags` is set so something 
invalid (and maybe fall back to default empty "string"). -- Would be good to 
hear from one of @lucasbru @lianetm @AndrewJSchofield @dajac @squah-confluent 
about it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -693,7 +699,6 @@ public GroupCoordinatorConfig(AbstractConfig config) {
             String.format("%s must be less than or equal to %s",
                 SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
 
-

Review Comment:
   nit: keep double empty line to separate "share" from "streams" section 
clearly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to