lianetm commented on code in PR #19886:
URL: https://github.com/apache/kafka/pull/19886#discussion_r2528979525
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
AuthenticationException authenticationException =
client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
+ } else if (u.node.isEmpty() && onClose) {
+ log.debug("Removing unsent request {} because the client is
closing", u);
+ iter.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
+ u.handler.onFailure(currentTimeMs,
Errors.NETWORK_EXCEPTION.exception());
Review Comment:
with this we're discarding any request we may have on close , if the node is
disconnected, right?
Seems sensible if the node is the Coordinator, no concerns there as you
explained (similar to what the classic does
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1142
and the root case of the console issue from what you explain)
But still thinking, could we be wrongly dropping other relevant requests on
close, like close fetch sessions? (if let's say, the leader is briefly
disconnected and comes back). The ClassicConsumer (and Async before this PR)
give those the full closeTimeout
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L129-L134
wdyt? I could be missing something on how the flow would actually go in that
case.
If we think this could be an issue, I wonder if we could address this at the
Coordinator level, to avoid any undesired side-effect on other requests. The
CoordinatorReqMgr knows when it's `closing`. Could we consider handling a
FindCoordinator failures differently if we know we're closing? here
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java#L123
(not retrying it anymore). Would that work?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]