kirktrue commented on code in PR #19886:
URL: https://github.com/apache/kafka/pull/19886#discussion_r2536041299
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
AuthenticationException authenticationException =
client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
+ } else if (u.node.isEmpty() && onClose) {
+ log.debug("Removing unsent request {} because the client is
closing", u);
+ iter.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
+ u.handler.onFailure(currentTimeMs,
Errors.NETWORK_EXCEPTION.exception());
Review Comment:
I had another minor concern regarding the handling in
`CoordinatorRequestManager` of the `NetworkException` that's passed to
`onFailure()`. How does the `CoordinatorRequestManager` logic handle that error?
For example, does this result in potentially misleading logging? For
example, in `CoordinatorRequestManager.markCoordinatorUnknown()`, there is some
logging that states that `Rediscovery will be attempted`, which isn't really
true.
Would it be better to pass a different exception type to `onFailure()` that
we know the `CoordinatorRequestManager` will interpret correctly in this
special case?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
AuthenticationException authenticationException =
client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
+ } else if (u.node.isEmpty() && onClose) {
+ log.debug("Removing unsent request {} because the client is
closing", u);
+ iter.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
+ u.handler.onFailure(currentTimeMs,
Errors.NETWORK_EXCEPTION.exception());
Review Comment:
IIUC, the logic in this PR is assuming the `UnsentRequest` represents a
`FIND_COORDINATOR` RPC because it has an empty `Node`. I was curious if we
could make the check a little more explicit, for example, in `UnsentRequest`:
```java
public boolean isFindCoordinatorRequest() {
return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}
```
And then the code here becomes more clear:
```java
} else if (u.isFindCoordinatorRequest() && onClose) {
log.debug("Removing unsent request {} because the client is
closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs,
Errors.NETWORK_EXCEPTION.exception());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]