Copilot commented on code in PR #21526:
URL: https://github.com/apache/kafka/pull/21526#discussion_r2833721722
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -433,6 +445,7 @@ public void updateMember(StreamsGroupMember newMember) {
maybeUpdateTaskProcessId(oldMember, newMember);
updateStaticMember(newMember);
maybeUpdateGroupState();
+ endpointToPartitionsCache.remove(newMember.memberId());
Review Comment:
The cache is being invalidated on every member update, but according to the
PR description, it should only be invalidated when a member's assigned tasks
change. This overly aggressive invalidation defeats the purpose of the cache,
as the cache will be cleared even when only non-task-related member fields are
updated (e.g., member epoch, heartbeat timestamp, etc.). Consider using
StreamsGroupMember.hasAssignedTasksChanged(oldMember, newMember) to
conditionally invalidate the cache only when tasks actually change.
```suggestion
if (oldMember == null ||
StreamsGroupMember.hasAssignedTasksChanged(oldMember, newMember)) {
endpointToPartitionsCache.remove(newMember.memberId());
}
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -1247,4 +1248,98 @@ public void testCancelTimers() {
verify(timer).cancel("initial-rebalance-timeout-test-group");
}
+
+ // Endpoint-to-partitions cache tests
+
+ @Test
+ public void testGetCachedEndpointToPartitionsReturnsEmptyWhenNoCache() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
cached =
+ streamsGroup.getCachedEndpointToPartitions("member-1");
+
+ assertTrue(cached.isEmpty());
+ }
+
+ @Test
+ public void testCacheEndpointToPartitionsAndRetrieve() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
+ new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+ endpointToPartitions.setUserEndpoint(
+ new
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)
+ );
+
+ streamsGroup.cacheEndpointToPartitions("member-1",
endpointToPartitions);
+ Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
cached =
+ streamsGroup.getCachedEndpointToPartitions("member-1");
+
+ assertTrue(cached.isPresent());
+ assertEquals(endpointToPartitions, cached.get());
+ }
+
+ @Test
+ public void testUpdateMemberInvalidatesCache() {
+ StreamsGroup streamsGroup = createStreamsGroup("test-group");
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
+ new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+ // Create initial member
+ StreamsGroupMember member =
StreamsGroupMember.Builder.withDefaults("member-1")
+ .setProcessId("process-1")
+ .build();
+ streamsGroup.updateMember(member);
+
+ // Cache endpoint info
+ streamsGroup.cacheEndpointToPartitions("member-1",
endpointToPartitions);
+
+ // Update member
+ StreamsGroupMember updatedMember =
StreamsGroupMember.Builder.withDefaults("member-1")
+ .setProcessId("process-1")
+ .build();
+ streamsGroup.updateMember(updatedMember);
+
+ // Cache should be invalidated
+
assertTrue(streamsGroup.getCachedEndpointToPartitions("member-1").isEmpty());
+ }
Review Comment:
This test should be updated to actually verify that the cache is invalidated
when assigned tasks change. Currently, it creates two members with identical
default values (both with empty/null assigned tasks), which doesn't test the
task-change invalidation logic. The test should create members with different
assigned tasks to properly validate the cache invalidation behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]