lucasbru commented on code in PR #21725:
URL: https://github.com/apache/kafka/pull/21725#discussion_r2930175673
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -558,6 +559,7 @@ private static MainConsumerSetup setupMainConsumer(final
TopologyMetadata topolo
processId,
config,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+ parseRackId((String)
config.originals().get(CommonClientConfigs.CLIENT_RACK_CONFIG)),
Review Comment:
Is there a reason to use `config.originals().get(...)` with a cast instead
of a typed accessor like
`config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG)`?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java:
##########
@@ -288,14 +288,15 @@ public void topicInfoShouldNotAcceptNulls() {
}
@Test
- public void
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesAndClientTags() {
+ public void
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesClientTagsRackId() {
Review Comment:
The test was renamed to mention `RackId`, but no assertion about `rackId`
was added. Either revert the rename or add a relevant assertion (though
`Optional<String>` is inherently immutable, so arguably the rename is
unnecessary).
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -624,6 +626,56 @@ public void
testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean in
}
}
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatRequestRackIdSentWhenJoining(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new StreamsGroupHeartbeatRequestManager.HeartbeatState(
+ streamsRebalanceData,
+ membershipManager,
+ 1234
+ );
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(RACK_ID, requestData1.rackId());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestData.rackId());
+ }
+
+ @ParameterizedTest
Review Comment:
Nit: typo in method name —
`testBuildingHeartbeatRequesClientTagSentWhenJoining` is missing a `t` in
"Request". Should be `testBuildingHeartbeatRequestClientTagSentWhenJoining`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]