lianetm commented on code in PR #16310:
URL: https://github.com/apache/kafka/pull/16310#discussion_r1638974414


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -574,6 +581,148 @@ public void testPollLongThrowsException() {
             "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
     }
 
+    @Test
+    public void testOffsetFetchStoresPendingEvent() {
+        consumer = newConsumer();
+        long timeoutMs = 0;
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+        // The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within the
+        // timeout, leaving a pending fetch.
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+        CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event = getLastEnqueuedEvent();
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future(), time.timer(timeoutMs)));
+        assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+        clearInvocations(applicationEventHandler);
+
+        // For the second attempt, the event is reused, so first verify that 
another FetchCommittedOffsetsEvent
+        // was not enqueued. On this attempt the Future returns successfully, 
clearing the pending fetch.
+        event.future().complete(Collections.emptyMap());
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+        assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), 
time.timer(timeoutMs)));
+        assertFalse(consumer.hasPendingOffsetFetchEvent());
+    }
+
+    @Test
+    public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() {
+        consumer = newConsumer();
+        long timeoutMs = 0;
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+        // The first attempt at poll() retrieves data for partition 0 of the 
topic. poll() creates an event,
+        // enqueues it, but its Future does not complete within the timeout, 
leaving a pending fetch.
+        consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+        CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event1 = getLastEnqueuedEvent();
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+        assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+        clearInvocations(applicationEventHandler);
+
+        // For the second attempt, the set of partitions is reassigned, 
causing the pending offset to be replaced.
+        // Verify that another FetchCommittedOffsetsEvent is enqueued.
+        consumer.assign(Collections.singleton(new TopicPartition("topic1", 
1)));
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+        CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event2 = getLastEnqueuedEvent();
+        assertNotEquals(event1, event2);
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs)));
+        assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+        clearInvocations(applicationEventHandler);
+
+        // For the third attempt, the event from attempt 2 is reused, so there 
should not have been another
+        // FetchCommittedOffsetsEvent enqueued. The Future is completed to 
make it return successfully in poll().
+        // This will finally clear out the pending fetch.
+        event2.future().complete(Collections.emptyMap());
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        verify(applicationEventHandler, 
never()).add(any(FetchCommittedOffsetsEvent.class));
+        assertDoesNotThrow(() -> ConsumerUtils.getResult(event2.future(), 
time.timer(timeoutMs)));
+        assertFalse(consumer.hasPendingOffsetFetchEvent());
+    }
+
+    @Test
+    public void testOffsetFetchDoesNotReuseExpiredPendingEvent() {
+        consumer = newConsumer();
+        long timeoutMs = 0;
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+        // The first attempt at poll() creates an event, enqueues it, but its 
Future does not complete within
+        // the timeout, leaving a pending fetch.
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+        CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event1 = getLastEnqueuedEvent();
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+        assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+        clearInvocations(applicationEventHandler);
+
+        // Sleep past the event's expiration, causing the poll() to *not* 
reuse the pending fetch. A new event
+        // is created and added to the application event queue.
+        time.sleep(event1.deadlineMs() - time.milliseconds());
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+        CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event2 = getLastEnqueuedEvent();
+        assertNotEquals(event1, event2);
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs)));
+        assertTrue(consumer.hasPendingOffsetFetchEvent());
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetFetchExceptionSupplier")
+    public void testOffsetFetchPendingEventErrors(Throwable reportedFailure,
+                                                  boolean expectErrorFromPoll,
+                                                  boolean expectPending) {
+        // Interrupt the thread and then clear it so that the interrupt flag 
is in a known good state before starting.
+        try {
+            Thread.currentThread().interrupt();
+        } finally {
+            assertTrue(Thread.interrupted());
+        }
+
+        consumer = newConsumer();
+        long timeoutMs = 0;
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
+
+        // Set up a pending offset fetch event.
+        consumer.poll(Duration.ofMillis(timeoutMs));
+        
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
+        CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event1 = getLastEnqueuedEvent();
+        assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs)));
+        assertTrue(consumer.hasPendingOffsetFetchEvent());
+
+        clearInvocations(applicationEventHandler);
+
+        assertNull(consumerUtilsMockedStatic);
+        consumerUtilsMockedStatic = mockStatic(ConsumerUtils.class);
+        consumerUtilsMockedStatic.when(() -> 
ConsumerUtils.getResult(any(Future.class), 
any(Timer.class))).thenThrow(reportedFailure);
+        consumerUtilsMockedStatic.when(() -> 
ConsumerUtils.getResult(any(Future.class))).thenThrow(reportedFailure);
+        consumerUtilsMockedStatic.when(() -> 
ConsumerUtils.maybeWrapAsKafkaException(any(Throwable.class))).thenCallRealMethod();

Review Comment:
   What about this (passes locally for me):
   ```
           @Test
       public void testOffsetFetchInterrupt() {
           consumer = newConsumer();
           long timeoutMs = 0;
           
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
           consumer.assign(Collections.singleton(new TopicPartition("topic1", 
0)));
   
           consumer.poll(Duration.ofMillis(timeoutMs));
           
verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class));
           CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> 
event = getLastEnqueuedEvent();
           assertTrue(consumer.hasPendingOffsetFetchEvent());
   
           event.future().completeExceptionally(new InterruptException(""));
           assertThrows(InterruptException.class, () -> 
consumer.poll(Duration.ofMillis(timeoutMs)));
           assertTrue(consumer.hasPendingOffsetFetchEvent());
       }
   ```
   
   Similar for Runtime and Timeout but adjusting the expectations:
   ```
           // Runtime 
           event.future().completeExceptionally(new RuntimeException(""));
           assertThrows(KafkaException.class, () -> 
consumer.poll(Duration.ofMillis(timeoutMs)));
           assertFalse(consumer.hasPendingOffsetFetchEvent());
           
           // Timeout 
           event.future().completeExceptionally(new TimeoutException(""));
           assertDoesNotThrow(() -> 
consumer.poll(Duration.ofMillis(timeoutMs)));
           assertTrue(consumer.hasPendingOffsetFetchEvent());
   ```
   
   All 3 passed for me. (Again, suggesting to simplify getting rid of all the 
static mocking and warnings, but feel free to leave it for follow-up if you 
prefer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to