lianetm commented on code in PR #19885:
URL: https://github.com/apache/kafka/pull/19885#discussion_r2538567773


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -751,6 +751,56 @@ public void 
testOffsetFetchRequestEnsureDuplicatedRequestSucceed() {
         assertEmptyPendingRequests(commitRequestManager);
     }
 
+    @Test
+    public void testOffsetFetchRequestShouldSucceedWithTopicId() {
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        Uuid topicId = Uuid.randomUuid();
+        when(metadata.topicIds()).thenReturn(Map.of("t1", topicId));
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, "t1"));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(new TopicPartition("t1", 0));
+
+        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+            commitRequestManager,
+            partitions,
+            2,
+            Errors.NONE,
+            true,
+            topicId);
+        futures.forEach(f -> {
+            assertTrue(f.isDone());
+            assertFalse(f.isCompletedExceptionally());
+        });
+        // expecting the buffers to be emptied after being completed 
successfully
+        commitRequestManager.poll(0);
+        assertEmptyPendingRequests(commitRequestManager);
+    }
+
+    @Test
+    public void 
testOffsetFetchRequestShouldFailWithTopicIdWhenMetadataUnknownResponseTopicId() 
{
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        Uuid topicId = Uuid.randomUuid();
+        when(metadata.topicIds()).thenReturn(Map.of("t1", topicId));
+        // Mock the scenario where the topicID from the response is not in the 
metadata.
+        when(metadata.topicNames()).thenReturn(Map.of());
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(new TopicPartition("t1", 0));
+
+        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+            commitRequestManager,
+            partitions,
+            1,
+            Errors.UNKNOWN_TOPIC_OR_PARTITION,

Review Comment:
   this generates a response with an error at the group level, which is not 
exactly the scenario we want to test. 
   
   We are after the case where we use topic IDs, the response has NONE error, 
but topic ID is not in metadata anymore. On fetch that is expected to fail with 
`KafkaException` (not retriable, sorry I mentioned testRetriable before, that's 
the case for commit but not for fetch). So we just need NONE and assert that 
the future completes exceptionally with `KafkaException` to cover our scenario 
I expect.
   
https://github.com/apache/kafka/blob/0491e1124eaa163a7ed7c139ab477c9f4a7b71c9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1142



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to