lucasbru commented on code in PR #21725:
URL: https://github.com/apache/kafka/pull/21725#discussion_r2930175673


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -558,6 +559,7 @@ private static MainConsumerSetup setupMainConsumer(final 
TopologyMetadata topolo
                     processId,
                     config,
                     
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+                    parseRackId((String) 
config.originals().get(CommonClientConfigs.CLIENT_RACK_CONFIG)),

Review Comment:
   Is there a reason to use `config.originals().get(...)` with a cast instead 
of a typed accessor like 
`config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG)`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java:
##########
@@ -288,14 +288,15 @@ public void topicInfoShouldNotAcceptNulls() {
     }
 
     @Test
-    public void 
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesAndClientTags() {
+    public void 
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesClientTagsRackId() {

Review Comment:
   The test was renamed to mention `RackId`, but no assertion about `rackId` 
was added. Either revert the rename or add a relevant assertion (though 
`Optional<String>` is inherently immutable, so arguably the rename is 
unnecessary).



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -624,6 +626,56 @@ public void 
testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean in
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestRackIdSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new StreamsGroupHeartbeatRequestManager.HeartbeatState(
+                streamsRebalanceData,
+                membershipManager,
+                1234
+            );
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(RACK_ID, requestData1.rackId());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.rackId());
+    }
+
+    @ParameterizedTest

Review Comment:
   Nit: typo in method name — 
`testBuildingHeartbeatRequesClientTagSentWhenJoining` is missing a `t` in 
"Request". Should be `testBuildingHeartbeatRequestClientTagSentWhenJoining`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to