AndrewJSchofield commented on code in PR #17824:
URL: https://github.com/apache/kafka/pull/17824#discussion_r1844107332


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -566,8 +573,11 @@ public CompletableFuture<Void> acknowledgeOnClose(final 
Map<TopicIdPartition, Ac
                 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
                     Acknowledgements acknowledgements = 
acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty());
 
-                    if (fetchAcknowledgementsMap.get(tip) != null) {
-                        
acknowledgements.merge(fetchAcknowledgementsMap.remove(tip));
+                    Acknowledgements acksFromShareFetch = 
fetchAcknowledgementsToSend.remove(tip);
+
+                    if (acksFromShareFetch != null) {
+                        acknowledgements.merge(acksFromShareFetch);
+                        fetchAcknowledgementsInFlight.put(tip, 
acksFromShareFetch);

Review Comment:
   It doesn't seem necessary to put the acknowledgements into the 
`fetchAcknowledgementsInFlight` map in this case. We are closing the share 
session using a `ShareAcknowledge` and this code is just picking up any 
acknowledgements which were intended to be piggybacked on the next fetch. They 
have essentially been taken over by this logic and don't need to be accounted 
for as fetch acknowledgments in flight.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to