AndrewJSchofield commented on code in PR #17537:
URL: https://github.com/apache/kafka/pull/17537#discussion_r1812494263


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,14 +174,53 @@ public PollResult poll(long currentTimeMs) {
                     
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
                 }
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                fetchedPartitions.add(tip);
 
-                log.debug("Added fetch request for partition {} to node {}", 
partition, node.id());
+                log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
+        // Map storing the list of partitions to forget in the upcoming 
request.
+        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+        Cluster cluster = metadata.fetch();
+        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription.
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                if (nodesWithPendingRequests.contains(node.id())) {
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                } else {
+                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                        if (!fetchedPartitions.contains(tip)) {
+                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsMap.get(tip);
+                            if (acknowledgementsToSend != null) {
+                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                            }
+                            sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
+                            partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
+                            partitionsToForgetMap.get(node).add(tip);
+
+                            forgottenTopicNames.putIfAbsent(tip.topicId(), 
tip.topic());
+                            fetchedPartitions.add(tip);
+                            log.debug("Added fetch request for partition {} to 
node {}", tip, node.id());
+                        }
+                    }
+                }
+            }
+        });
+
         Map<Node, ShareFetchRequest.Builder> builderMap = new 
LinkedHashMap<>();
         for (Map.Entry<Node, ShareSessionHandler> entry : 
handlerMap.entrySet()) {
             builderMap.put(entry.getKey(), 
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+            Node node = entry.getKey();
+            ShareFetchRequestData data = builderMap.get(entry.getKey()).data();
+            if (partitionsToForgetMap.containsKey(node)) {
+                if (data.forgottenTopicsData() == null) {
+                    data.setForgottenTopicsData(new ArrayList<>());
+                }
+                
ShareFetchRequest.Builder.updateForgottenData(partitionsToForgetMap.get(node), 
data);

Review Comment:
   I'd prefer this to be a method on the `ShareFetchRequest.Builder` instance, 
rather than a static method. Look at 
`FetchRequest.Builder.addToForgottenTopicMap`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -594,7 +635,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
                     topicResponse.partitions().forEach(partition ->
                             responseData.put(new 
TopicIdPartition(topicResponse.topicId(),
                                     partition.partitionIndex(),
-                                    
metadata.topicNames().get(topicResponse.topicId())), partition)));
+                                    
metadata.topicNames().getOrDefault(topicResponse.topicId(), 
forgottenTopicNames.remove(topicResponse.topicId()))), partition)));

Review Comment:
   This seems wrong to me. Each partition for a forgotten topic will cause 
`forgottenTopicNames.remove()` to be called, and only the first call will be 
valid. The handling of the forgotten topic names needs to be a bit less clever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to