AndrewJSchofield commented on code in PR #20838:
URL: https://github.com/apache/kafka/pull/20838#discussion_r2514234228
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -676,15 +688,27 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> ack
sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
return fetch;
+ } else if (currentFetch.hasRenewals()) {
+ // First, take any records which have been renewed and move them
back into in-flight records.
+ currentFetch.takeRenewedRecords();
+
+ // If some records are in renewing state...
+ if (currentFetch.hasRenewals()) {
+ // We only send one ShareFetchEvent per poll call.
+ if (shouldSendShareFetchEvent) {
+ // Check for any acknowledgements which could have come
from control records (GAP) and include them.
Review Comment:
I've changed this code around a bit. Please re-review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]