kirktrue commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1884655709


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -200,12 +196,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);

Review Comment:
   Are there any edge cases that could occur if `fatalError` was already set 
when it's overwritten here? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));
+        fatalError = Optional.of(exception);

Review Comment:
   It's cleared in the `AbstractHeartbeatRequestManager` at this point, but I 
don't know if it should also be cleared elsewhere instead or in addition to 
`AbstractHeartbeatRequestManager.poll()`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -176,9 +176,15 @@ public CommitRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        // poll only when the coordinator node is known.
-        if (coordinatorRequestManager.coordinator().isEmpty())
+        // poll when the coordinator node is known and fatal errors are not 
present
+        if (coordinatorRequestManager.coordinator().isEmpty()) {
+            Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+            if (fatalError.isPresent()) {
+                pendingRequests.unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+                pendingRequests.unsentOffsetFetches.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+            }
             return EMPTY;

Review Comment:
   The `CommitRequestManager` does _not_ clear the fatal error, but the 
`AbstractHeartbeatRequestManager` does. Can you explain and/or add some 
comments so that others can understand the distinction between these two cases?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -176,9 +176,15 @@ public CommitRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        // poll only when the coordinator node is known.
-        if (coordinatorRequestManager.coordinator().isEmpty())
+        // poll when the coordinator node is known and fatal errors are not 
present
+        if (coordinatorRequestManager.coordinator().isEmpty()) {
+            Optional<Throwable> fatalError = 
coordinatorRequestManager.fatalError();
+            if (fatalError.isPresent()) {
+                pendingRequests.unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+                pendingRequests.unsentOffsetFetches.forEach(request -> 
request.future.completeExceptionally(fatalError.get()));
+            }
             return EMPTY;

Review Comment:
   If there's an error, do we not want to also fail any 
`commitSync()`/`commitAsync()` calls? Or is that handled separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to