divijvaidya commented on code in PR #12070:
URL: https://github.com/apache/kafka/pull/12070#discussion_r856938195
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:
##########
@@ -490,17 +491,11 @@ long trySend(long now) {
// send any requests that can be sent now
for (Node node : unsent.nodes()) {
Iterator<ClientRequest> iterator = unsent.requestIterator(node);
- if (iterator.hasNext())
+ if (iterator.hasNext()) {
Review Comment:
please add a comment why are we sending only one request per node here
(perhaps, same as the one that you described in the JIRA). It will make reading
the code easier for future readers.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:
##########
@@ -283,6 +283,7 @@ public void poll(Timer timer, PollCondition pollCondition,
boolean disableWakeup
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(timer.currentTimeMs());
+ client.poll(0, timer.currentTimeMs());
Review Comment:
1. The 0 timeout here does not respect the conditions
`(pendingCompletion.isEmpty() && (pollCondition == null ||
pollCondition.shouldBlock()))`, especially the caller's criteria for
`shouldBlock`.
2. You probably need to `checkDisconnects` after this poll otherwise the
requests failing with disconnects during this poll will never have the callback
triggered and they may be cleaned up from the unsent map at the end of the
function without having their callback triggered.
Since this second attempt to trySend here is sort of a retry, I would
recommend to move line 255-281 in a function and call that function over here
again.
If my points 1 and 2 are correct, please check why no tests failed even
though we had bugs in the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]