lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1761365984


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1341,6 +1342,79 @@ public void testCloseAwaitPendingAsyncCommitComplete() {
         assertEquals(1, cb.invoked);
     }
 
+    @Test
+    public void testCloseWithInterruptUsingDefaultTimeout() {
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        consumer = spy(newConsumer(
+            mock(FetchBuffer.class),
+            mock(ConsumerInterceptors.class),
+            mock(ConsumerRebalanceListenerInvoker.class),
+            subscriptions,
+            "group-id",
+            "client-id"));
+
+        // This future is completed when the ConsumerRebalanceListener has 
been invoked and the
+        // ConsumerRebalanceListenerCallbackCompletedEvent has been enqueued.
+        CompletableFuture<Void> crlCallbackCompletedFuture = new 
CompletableFuture<>();
+
+        doAnswer(invocation -> {
+            // When an UnsubscribeEvent is enqueued, don't complete it 
immediately. Instead, enqueue the 'rebalance
+            // callback needed' event from the background thread.
+            SortedSet<TopicPartition> partitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+            partitions.addAll(subscriptions.assignedPartitions());
+            backgroundEventQueue.add(new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
partitions));
+
+            // Complete the unsubscribe event when the 
ConsumerRebalanceListenerCallbackCompletedEvent has been
+            // enqueued.
+            UnsubscribeEvent event = invocation.getArgument(0);
+            crlCallbackCompletedFuture.whenComplete((result, exception) -> {
+                if (exception != null)
+                    event.future().completeExceptionally(exception);
+                else
+                    event.future().complete(result);
+            });
+
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
+
+        doAnswer(invocation -> {
+            // This triggers the completion of the UnsubscribeEvent above.
+            crlCallbackCompletedFuture.complete(null);
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(ConsumerRebalanceListenerCallbackCompletedEvent.class));
+
+        try {
+            Thread.currentThread().interrupt();
+            assertThrows(InterruptException.class, () -> consumer.close());
+        } finally {
+            Thread.interrupted();
+        }
+
+        verifyUnsubscribeEvent(subscriptions);
+        
verify(applicationEventHandler).add(any(ConsumerRebalanceListenerCallbackCompletedEvent.class));
+    }
+
+    @Test
+    public void testCloseWithInterruptUsingZeroTimeout() {
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        consumer = spy(newConsumer(
+            mock(FetchBuffer.class),
+            mock(ConsumerInterceptors.class),
+            mock(ConsumerRebalanceListenerInvoker.class),
+            subscriptions,
+            "group-id",
+            "client-id"));
+
+        try {
+            Thread.currentThread().interrupt();
+            assertThrows(InterruptException.class, () -> 
consumer.close(Duration.ZERO));
+        } finally {
+            Thread.interrupted();
+        }
+
+        verifyUnsubscribeEvent(subscriptions);
+        verify(applicationEventHandler, 
never()).add(any(ConsumerRebalanceListenerCallbackCompletedEvent.class));

Review Comment:
   Couldn't this happen sometimes? I expect that close with zero (or low 
timeout) could trigger the callbacks (it's just that we can't make sure it 
will, it depends on time and the 2 threads for now I guess?). 
   
   I expect it could trigger the callbacks if the background thread gets to 
process the Unsubscribe event between the moment the app thread adds it:
   
https://github.com/apache/kafka/blob/086038dc3aec4664e7df272755f9a84ab627965a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1289
   
   and the moment it times out:
   
https://github.com/apache/kafka/blob/086038dc3aec4664e7df272755f9a84ab627965a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1945
   
   If my expectation is right, honestly I would say we can't add a test for 
close(0) to verify callbacks for now. We really can't control what happens yet. 
Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to