Copilot commented on code in PR #21526:
URL: https://github.com/apache/kafka/pull/21526#discussion_r2833721722


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -433,6 +445,7 @@ public void updateMember(StreamsGroupMember newMember) {
         maybeUpdateTaskProcessId(oldMember, newMember);
         updateStaticMember(newMember);
         maybeUpdateGroupState();
+        endpointToPartitionsCache.remove(newMember.memberId());

Review Comment:
   The cache is being invalidated on every member update, but according to the 
PR description, it should only be invalidated when a member's assigned tasks 
change. This overly aggressive invalidation defeats the purpose of the cache, 
as the cache will be cleared even when only non-task-related member fields are 
updated (e.g., member epoch, heartbeat timestamp, etc.). Consider using 
StreamsGroupMember.hasAssignedTasksChanged(oldMember, newMember) to 
conditionally invalidate the cache only when tasks actually change.
   ```suggestion
           if (oldMember == null || 
StreamsGroupMember.hasAssignedTasksChanged(oldMember, newMember)) {
               endpointToPartitionsCache.remove(newMember.memberId());
           }
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -1247,4 +1248,98 @@ public void testCancelTimers() {
 
         verify(timer).cancel("initial-rebalance-timeout-test-group");
     }
+
+    // Endpoint-to-partitions cache tests
+
+    @Test
+    public void testGetCachedEndpointToPartitionsReturnsEmptyWhenNoCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.getCachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isEmpty());
+    }
+
+    @Test
+    public void testCacheEndpointToPartitionsAndRetrieve() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+        endpointToPartitions.setUserEndpoint(
+            new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)
+        );
+
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.getCachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isPresent());
+        assertEquals(endpointToPartitions, cached.get());
+    }
+
+    @Test
+    public void testUpdateMemberInvalidatesCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Create initial member
+        StreamsGroupMember member = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(member);
+
+        // Cache endpoint info
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+
+        // Update member
+        StreamsGroupMember updatedMember = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(updatedMember);
+
+        // Cache should be invalidated
+        
assertTrue(streamsGroup.getCachedEndpointToPartitions("member-1").isEmpty());
+    }

Review Comment:
   This test should be updated to actually verify that the cache is invalidated 
when assigned tasks change. Currently, it creates two members with identical 
default values (both with empty/null assigned tasks), which doesn't test the 
task-change invalidation logic. The test should create members with different 
assigned tasks to properly validate the cache invalidation behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to