kuoche1712003 commented on code in PR #19914:
URL: https://github.com/apache/kafka/pull/19914#discussion_r2266029745


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -178,6 +178,11 @@ public CommitRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+        if (coordinatorRequestManager.coordinator().isEmpty() && closing) {
+            handleClosingWithoutCoordinator();
+            return EMPTY;
+        }
+
         // poll when the coordinator node is known and fatal error is not 
present
         if (coordinatorRequestManager.coordinator().isEmpty()) {
             pendingRequests.maybeFailOnCoordinatorFatalError();

Review Comment:
   Perhaps it could be written like this for better readability:
   ```suggestion
               pendingRequests.maybeFailOnCoordinatorFatalError();
               if(closing && pendingRequests.hasUnsentRequests()){
                   CommitFailedException exception = new CommitFailedException(
                   "Failed to commit offsets: Coordinator unknown and consumer 
is closing");
                   pendingRequests.drainPendingCommits()
                     .forEach(request -> 
request.future().completeExceptionally(exception));
               }
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1491,6 +1492,31 @@ private static void 
assertEmptyPendingRequests(CommitRequestManager commitReques
         
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
     }
 
+    @Test
+    public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
+                new OffsetAndMetadata(0));
+
+        var commitFuture = commitRequestManager.commitAsync(offsets);
+
+        commitRequestManager.signalClose();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        when(coordinatorRequestManager.fatalError())
+                .thenReturn(Optional.of(new GroupAuthorizationException("Fatal 
error")));
+
+        assertEquals(NetworkClientDelegate.PollResult.EMPTY, 
commitRequestManager.poll(200));
+
+        assertTrue(commitFuture.isCompletedExceptionally());
+
+        ExecutionException exception = assertThrows(ExecutionException.class, 
commitFuture::get);
+
+        assertInstanceOf(GroupAuthorizationException.class, 
exception.getCause());
+        assertEquals("Fatal error", exception.getCause().getMessage());

Review Comment:
   You might consider using this method.
   `TestUtils.assertFutureThrows`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1491,6 +1492,31 @@ private static void 
assertEmptyPendingRequests(CommitRequestManager commitReques
         
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
     }
 
+    @Test
+    public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));

Review Comment:
   Would it be necessary to include this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to