ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -644,8 +645,12 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> ack
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
+ // Check for any acknowledgements which could have come from
control records (GAP) and include them.
+ Map<TopicIdPartition, NodeAcknowledgements>
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
// Fetch more records and send any waiting acknowledgements
- applicationEventHandler.add(new
ShareFetchEvent(acknowledgementsMap));
+ applicationEventHandler.add(new
ShareFetchEvent(combinedAcknowledgements));
Review Comment:
Kind of :)) So it would have empty records but could have non-empty
acknowledgements (for skipped records).
- Some integ tests in this PR - https://github.com/apache/kafka/pull/19261
revealed that in transactions, when client receives only a control record(eg.
an abort marker) in the `ShareFetchResponse` (without any non-control record),
then in the `ShareCompletedFetch`, these control records are never
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring
these control records) and are never presented to the consumer application.
- It is expected that control records are skipped and are not presented to
the application, so the records never arrive to the application thread, but
client should still acknowledge them with GAP
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
- Now these control records are usually auto acknowledged with `GAP` and
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty,
we actually ignore the fetch here(meaning we never acknowledge these control
records) -
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
- Now after this PR, any possible acknowledgements that came in with the
empty fetch (from control records) to the `ShareFetchEvent` are added so that
it can be sent on the next `poll()`.
- We cannot present these to the application, so the check for
`fetch.isEmpty` cannot be altered. But yeah there is a case when this could
happen. I agree it looks a bit odd though for readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]