AndrewJSchofield commented on code in PR #20838:
URL: https://github.com/apache/kafka/pull/20838#discussion_r2514234228


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -676,15 +688,27 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> ack
                 sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
             }
             return fetch;
+        } else if (currentFetch.hasRenewals()) {
+            // First, take any records which have been renewed and move them 
back into in-flight records.
+            currentFetch.takeRenewedRecords();
+
+            // If some records are in renewing state...
+            if (currentFetch.hasRenewals()) {
+                // We only send one ShareFetchEvent per poll call.
+                if (shouldSendShareFetchEvent) {
+                    // Check for any acknowledgements which could have come 
from control records (GAP) and include them.

Review Comment:
   I've changed this code around a bit. Please re-review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to