lianetm commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1876144917
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs,
final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization
error {}", exception.getMessage());
KafkaException groupAuthorizationException =
GroupAuthorizationException.forGroupId(this.groupId);
- backgroundEventHandler.add(new
ErrorEvent(groupAuthorizationException));
+ fatalError = Optional.of(groupAuthorizationException);
return;
}
log.warn("FindCoordinator request failed due to fatal exception",
exception);
- backgroundEventHandler.add(new ErrorEvent(exception));
+ fatalError = Optional.of(exception);
Review Comment:
seems we're never clearing this fatal error and we should right? we could
consider clearing it when the request completes, or maybe better when a new
request is generated on poll? thoughts?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -188,12 +184,12 @@ private void onFailedResponse(final long currentTimeMs,
final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization
error {}", exception.getMessage());
KafkaException groupAuthorizationException =
GroupAuthorizationException.forGroupId(this.groupId);
- backgroundEventHandler.add(new
ErrorEvent(groupAuthorizationException));
+ fatalError = Optional.of(groupAuthorizationException);
return;
}
log.warn("FindCoordinator request failed due to fatal exception",
exception);
- backgroundEventHandler.add(new ErrorEvent(exception));
Review Comment:
I expect that removing this here will introduce a gap we need to cover: how
are we going to get these coordinator errors from the consumer.poll now? (if no
commit in use)
I see 2 options:
1. we keep the coordinator mgr propagating its errors via the event queue,
just because we do need to know about them in the app thread (from the
consumer.poll), without a triggering API event + the coordinator mgr exposes a
`fatalError` just to allow other managers in the background thread take
internal actions (ex. commit mgr needs to cancel pending requests).
2. the coord mgr does not propagate via error event anymore, just exposes
the coordinator found or fatal error. Other managers that require a coordinator
know what to do about it:
- commit request mgr uses the coordinator, or cancels all its pending
requests if there is a coord fatal error
- HB mgr uses the coordinator or propagates an error via ErrorEvent if
there is a coord fatal error (currently the HB Mgr just skips sending a HB if
there is no coordinator, but it could propagate the error too )
https://github.com/apache/kafka/blob/d76238a18fb6a86b4b08bc04918ea1a0ee626517/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L164
These are the only 2 managers that require a known coordinator, so I would
expect we cover it all by ensuring that they handle the coordinator fatal error
properly. Thoughts? (These are first thoughts, let's shape them)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]