AndrewJSchofield commented on code in PR #17824:
URL: https://github.com/apache/kafka/pull/17824#discussion_r1844107332
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -566,8 +573,11 @@ public CompletableFuture<Void> acknowledgeOnClose(final
Map<TopicIdPartition, Ac
for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements =
acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty());
- if (fetchAcknowledgementsMap.get(tip) != null) {
-
acknowledgements.merge(fetchAcknowledgementsMap.remove(tip));
+ Acknowledgements acksFromShareFetch =
fetchAcknowledgementsToSend.remove(tip);
+
+ if (acksFromShareFetch != null) {
+ acknowledgements.merge(acksFromShareFetch);
+ fetchAcknowledgementsInFlight.put(tip,
acksFromShareFetch);
Review Comment:
It doesn't seem necessary to put the acknowledgements into the
`fetchAcknowledgementsInFlight` map in this case. We are closing the share
session using a `ShareAcknowledge` and this code is just picking up any
acknowledgements which were intended to be piggybacked on the next fetch. They
have essentially been taken over by this logic and don't need to be accounted
for as fetch acknowledgments in flight.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]