lianetm commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1876144917


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));
+        fatalError = Optional.of(exception);

Review Comment:
   seems we're never clearing this fatal error and we should right? we could 
consider clearing it when the request completes, or maybe better when a new 
request is generated on poll? thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs, 
final Throwable exceptio
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));

Review Comment:
   I expect that removing this here will introduce a gap we need to cover: how 
are we going to get these coordinator errors from the consumer.poll now? (if no 
commit in use)
   I see 2 options:
   1. we keep the coordinator mgr propagating its errors via the event queue, 
just because we do need to know about them in the app thread (from the 
consumer.poll), without a triggering API event + the coordinator mgr exposes a 
`fatalError` just to allow other managers in the background thread take 
internal actions (ex. commit mgr needs to cancel pending requests).
   2. the coord mgr does not propagate via error event anymore, just exposes 
the coordinator found or fatal error. Other managers that require a coordinator 
know what to do about it: 
       - commit request mgr uses the coordinator, or cancels all its pending 
requests if there is a coord fatal error
       - HB mgr uses the coordinator or propagates an error via ErrorEvent if 
there is a coord fatal error (currently the HB Mgr just skips sending a HB if 
there is no coordinator, but it could propagate the error too ) 
https://github.com/apache/kafka/blob/d76238a18fb6a86b4b08bc04918ea1a0ee626517/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L164
   
   These are the only 2 managers that require a known coordinator, so I would 
expect we cover it all by ensuring that they handle the coordinator fatal error 
properly. Thoughts? (These are first thoughts, let's shape them)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to