ShivsundarR commented on code in PR #19886:
URL: https://github.com/apache/kafka/pull/19886#discussion_r2530061855


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
                 
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
                 AuthenticationException authenticationException = 
client.authenticationException(u.node.get());
                 u.handler.onFailure(currentTimeMs, authenticationException);
+            } else if (u.node.isEmpty() && onClose) {
+                log.debug("Removing unsent request {} because the client is 
closing", u);
+                iter.remove();
+                
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
+                u.handler.onFailure(currentTimeMs, 
Errors.NETWORK_EXCEPTION.exception());

Review Comment:
   Hi @lianetm , thanks for the review.
   
   > with this we're discarding any request we may have on close , if the node 
is disconnected, right?
   
   Not really, so we will still allow time for all pending requests (like 
`commitSync` or acknowledgements callback or `findCoordinator`) to complete, 
here `onClose` will be true only when we have completed waiting in those steps 
and reached at the step to close the network thread itself.
   
https://github.com/apache/kafka/blob/0f4dbf7fd76d0fb29fec1e3588399d67628d278f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L979-L981
   
   
   --------------
   
   > But still thinking, could we be wrongly dropping other relevant requests 
on close, like close fetch sessions?
   
   Again as mentioned above and in this comment - 
https://github.com/apache/kafka/pull/19886#issuecomment-2965330373, the boolean 
`onClose` would only be true when we have completed all the required 
steps(updating callbacks, closing out sessions, after sending 
`stopFindCoordinatorOnCloseEvent`, etc). 
   So at this stage, there would be no other requests that we would be waiting 
for(if there are any pending requests, they already had their timer expired or 
get a response as "broker disconnected", that's why this stage of closing 
network thread was reached).
   
   So even if a broker came up again now and responded to a previous request 
(lets say commit response), we would not be updating callbacks(or any 
background events) anymore as in the application thread we have finished all 
processing and reached the stage of closing the application event handler 
itself.
   
   ----------------
   
   > The ClassicConsumer (and Async before this PR) give those the full 
closeTimeout
   
   So the `ClassicConsumer` does give it full closeTimeout but only if the 
coordinator isn't null.
   
   
   
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1143-L1147
   
   I tried this out locally and and if we shutdown the broker first, the 
coordinator here is null, so it closes immediately.
   I got this log also when consumer closed, so there is a check to check for 
disconnected nodes there too. 
   
   
https://github.com/apache/kafka/blob/0f4dbf7fd76d0fb29fec1e3588399d67628d278f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L406-L408
   
   -------------------
   
   > Would that work? (and by "closing" I refer to the actual var inside the 
CoordReqMgr, that is only flipped after committing)
   
   - I don't think so, as this variable is set to true only on getting 
"`StopFindCoordinatorOnCloseEvent`".
   - But if a broker was shutdown before the client was shutdown, for both 
`AsyncConsumer` and `ShareConsumer`, this unsent request for findCoordinator(to 
a "null" node) still lingers around in `NetworkClientDelegate` and is retried 
until the timeout and the `stopFindCoordinatorOnCloseEvent` has no effect.
   
   So the `whenComplete` will be reached only after the timeout of 30 seconds. 
So it would not help terminate the consumer as soon we hit ctrl-c.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to