Copilot commented on code in PR #21526:
URL: https://github.com/apache/kafka/pull/21526#discussion_r2833721764


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -1247,4 +1248,98 @@ public void testCancelTimers() {
 
         verify(timer).cancel("initial-rebalance-timeout-test-group");
     }
+
+    // Endpoint-to-partitions cache tests
+
+    @Test
+    public void testGetCachedEndpointToPartitionsReturnsEmptyWhenNoCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.getCachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isEmpty());
+    }
+
+    @Test
+    public void testCacheEndpointToPartitionsAndRetrieve() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+        endpointToPartitions.setUserEndpoint(
+            new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)
+        );
+
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+        Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
cached =
+            streamsGroup.getCachedEndpointToPartitions("member-1");
+
+        assertTrue(cached.isPresent());
+        assertEquals(endpointToPartitions, cached.get());
+    }
+
+    @Test
+    public void testUpdateMemberInvalidatesCache() {
+        StreamsGroup streamsGroup = createStreamsGroup("test-group");
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions =
+            new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+
+        // Create initial member
+        StreamsGroupMember member = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(member);
+
+        // Cache endpoint info
+        streamsGroup.cacheEndpointToPartitions("member-1", 
endpointToPartitions);
+
+        // Update member
+        StreamsGroupMember updatedMember = 
StreamsGroupMember.Builder.withDefaults("member-1")
+            .setProcessId("process-1")
+            .build();
+        streamsGroup.updateMember(updatedMember);
+
+        // Cache should be invalidated
+        
assertTrue(streamsGroup.getCachedEndpointToPartitions("member-1").isEmpty());
+    }

Review Comment:
   This test should be updated to actually verify that the cache is invalidated 
when assigned tasks change. Currently, it creates two members with identical 
default values (both with empty/null assigned tasks), which doesn't test the 
task-change invalidation logic. The test should create members with different 
assigned tasks to properly validate the cache invalidation behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to