ShivsundarR commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1698631496
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -207,46 +212,32 @@ public void fetch(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap) {
}
/**
- * Process acknowledgeRequestStates and prepares a list of
acknowledgements to be sent in the poll().
+ * Process acknowledgeRequestStates and prepares a list of
acknowledgements to
+ * be sent in the poll().
*
* @param currentTimeMs the current time in ms.
*
* @return the PollResult containing zero or more acknowledgements.
*/
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
- Iterator<AcknowledgeRequestState> iterator =
acknowledgeRequestStates.iterator();
- while (iterator.hasNext()) {
- AcknowledgeRequestState acknowledgeRequestState = iterator.next();
- if (acknowledgeRequestState.isProcessed()) {
- iterator.remove();
- } else if (!acknowledgeRequestState.maybeExpire()) {
- if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
- log.trace("Skipping acknowledge request because previous
request to {} has not been processed", acknowledgeRequestState.nodeId);
- } else {
- if (acknowledgeRequestState.canSendRequest(currentTimeMs))
{
- acknowledgeRequestState.onSendAttempt(currentTimeMs);
- UnsentRequest request =
acknowledgeRequestState.buildRequest(currentTimeMs);
- if (request != null) {
- unsentRequests.add(request);
- }
- }
- }
- } else {
- // Fill in TimeoutException
- for (TopicIdPartition tip :
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.REQUEST_TIMED_OUT);
- }
- iterator.remove();
- }
+ for (Pair<AcknowledgeRequestState> requestStates :
acknowledgeRequestStates.values()) {
+ isAsyncDone = false;
+ // For commitAsync
+ maybeBuildRequest(requestStates.getFirst(), currentTimeMs,
true).ifPresent(unsentRequests::add);
Review Comment:
Yeah makes sense, I have updated now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]