ShivsundarR commented on code in PR #20838:
URL: https://github.com/apache/kafka/pull/20838#discussion_r2513670528
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -676,15 +688,27 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> ack
sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
return fetch;
+ } else if (currentFetch.hasRenewals()) {
+ // First, take any records which have been renewed and move them
back into in-flight records.
+ currentFetch.takeRenewedRecords();
+
+ // If some records are in renewing state...
+ if (currentFetch.hasRenewals()) {
+ // We only send one ShareFetchEvent per poll call.
+ if (shouldSendShareFetchEvent) {
+ // Check for any acknowledgements which could have come
from control records (GAP) and include them.
Review Comment:
Minor Nit(unrelated to this change): this comment does not hold true here,
the control record acknowledgements are sent in the previous condition as a
`ShareAcknowledgeAsyncEvent`.
This should have been changed as part of this
PR(https://github.com/apache/kafka/pull/20794) itself in line 680, I had missed
it there.
Can we change the comment in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]