dajac commented on code in PR #19856:
URL: https://github.com/apache/kafka/pull/19856#discussion_r2121092739


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -142,7 +142,7 @@ private ConsumerMembershipManager 
createMembershipManagerJoiningGroup(String gro
 
     private ConsumerMembershipManager createMembershipManager(String 
groupInstanceId) {

Review Comment:
   Let's extend unit tests to cover the rack field.



##########
clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java:
##########
@@ -73,6 +74,16 @@ public GroupRebalanceConfig(AbstractConfig config, 
ProtocolType protocolType) {
             this.groupInstanceId = Optional.empty();
         }
 
+        // The WorkerGroupMember in connect module also uses this class, but 
there is no client.rack in DistributedConfig.
+        // Ignore the rackId in that case to avoid ConfigException.
+        // The GroupCoordinatorService throws error if the rackId is empty. 
The default value of client.rack is empty string.
+        // Skip empty rackId to avoid InvalidRequestException.
+        if 
(config.values().containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG) && 
!config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty()) {
+            this.rackId = 
Optional.ofNullable(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG));
+        } else {
+            this.rackId = Optional.empty();
+        }

Review Comment:
   - Should we only do this if the protocolType is CONSUMER?
   - It may be simpler to get the config in a local variable and to set 
this.rack if non-null and non-empty;



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -262,6 +262,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() 
{
             // InstanceId - set if present
             membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
+            // RackId - set if present
+            membershipManager.rackId().ifPresent(data::setRackId);

Review Comment:
   I think that it should only be set if it did not change since the last 
request or if we send a full request.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java:
##########
@@ -216,6 +222,92 @@ public void testLeaderEpoch(ClusterInstance 
clusterInstance) throws Exception {
         }
     }
 
+    @ClusterTest(

Review Comment:
   I wonder whether we could add another test which verifies that a rebalance 
is triggered when the racks of a partition has changed. Have you considered it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to