ShivsundarR commented on code in PR #20838:
URL: https://github.com/apache/kafka/pull/20838#discussion_r2513670528


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -676,15 +688,27 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> ack
                 sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
             }
             return fetch;
+        } else if (currentFetch.hasRenewals()) {
+            // First, take any records which have been renewed and move them 
back into in-flight records.
+            currentFetch.takeRenewedRecords();
+
+            // If some records are in renewing state...
+            if (currentFetch.hasRenewals()) {
+                // We only send one ShareFetchEvent per poll call.
+                if (shouldSendShareFetchEvent) {
+                    // Check for any acknowledgements which could have come 
from control records (GAP) and include them.

Review Comment:
   Minor Nit(unrelated to this change): this comment does not hold true here, 
the control record acknowledgements are sent in the previous condition as a 
`ShareAcknowledgeAsyncEvent`. 
   This should have been changed as part of this 
PR(https://github.com/apache/kafka/pull/20794) itself in line 680, I had missed 
it there.
   Can we change the comment in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to