kuoche1712003 commented on code in PR #19914: URL: https://github.com/apache/kafka/pull/19914#discussion_r2266029745
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -178,6 +178,11 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + if (coordinatorRequestManager.coordinator().isEmpty() && closing) { + handleClosingWithoutCoordinator(); + return EMPTY; + } + // poll when the coordinator node is known and fatal error is not present if (coordinatorRequestManager.coordinator().isEmpty()) { pendingRequests.maybeFailOnCoordinatorFatalError(); Review Comment: Perhaps it could be written like this for better readability: ```suggestion pendingRequests.maybeFailOnCoordinatorFatalError(); if(closing && pendingRequests.hasUnsentRequests()){ CommitFailedException exception = new CommitFailedException( "Failed to commit offsets: Coordinator unknown and consumer is closing"); pendingRequests.drainPendingCommits() .forEach(request -> request.future().completeExceptionally(exception)); } ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -1491,6 +1492,31 @@ private static void assertEmptyPendingRequests(CommitRequestManager commitReques assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty()); } + @Test + public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() { + CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + var commitFuture = commitRequestManager.commitAsync(offsets); + + commitRequestManager.signalClose(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + when(coordinatorRequestManager.fatalError()) + .thenReturn(Optional.of(new GroupAuthorizationException("Fatal error"))); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200)); + + assertTrue(commitFuture.isCompletedExceptionally()); + + ExecutionException exception = assertThrows(ExecutionException.class, commitFuture::get); + + assertInstanceOf(GroupAuthorizationException.class, exception.getCause()); + assertEquals("Fatal error", exception.getCause().getMessage()); Review Comment: You might consider using this method. `TestUtils.assertFutureThrows` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -1491,6 +1492,31 @@ private static void assertEmptyPendingRequests(CommitRequestManager commitReques assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty()); } + @Test + public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() { + CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); Review Comment: Would it be necessary to include this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org