lianetm commented on code in PR #19886:
URL: https://github.com/apache/kafka/pull/19886#discussion_r2528979525


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
                 
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
                 AuthenticationException authenticationException = 
client.authenticationException(u.node.get());
                 u.handler.onFailure(currentTimeMs, authenticationException);
+            } else if (u.node.isEmpty() && onClose) {
+                log.debug("Removing unsent request {} because the client is 
closing", u);
+                iter.remove();
+                
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
+                u.handler.onFailure(currentTimeMs, 
Errors.NETWORK_EXCEPTION.exception());

Review Comment:
   with this we're discarding any request we may have on close , if the node is 
disconnected, right?
   
   Seems sensible if the node is the Coordinator, no concerns there as you 
explained (similar to what the classic does 
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1142
 and the root case of the console issue from what you explain)
   
   But still thinking, could we be wrongly dropping other relevant requests on 
close, like close fetch sessions? (if let's say, the leader is briefly 
disconnected and comes back). The ClassicConsumer (and Async before this PR) 
give those the full closeTimeout 
   
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L129-L134
 
   
   wdyt? I could be missing something on how the flow would actually go in that 
case.
   
   If we think this could be an issue, I wonder if we could address this at the 
Coordinator level, to avoid any undesired side-effect on other requests. The 
CoordinatorReqMgr knows when it's `closing`. Could we consider handling a 
FindCoordinator failures differently if we know we're closing? here 
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java#L123
 (not retrying it anymore). Would that work? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to