ShivsundarR commented on code in PR #17537:
URL: https://github.com/apache/kafka/pull/17537#discussion_r1815384317
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,14 +174,53 @@ public PollResult poll(long currentTimeMs) {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
}
handler.addPartitionToFetch(tip, acknowledgementsToSend);
+ fetchedPartitions.add(tip);
- log.debug("Added fetch request for partition {} to node {}",
partition, node.id());
+ log.debug("Added fetch request for partition {} to node {}",
tip, node.id());
}
}
+ // Map storing the list of partitions to forget in the upcoming
request.
+ Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new
HashMap<>();
+ Cluster cluster = metadata.fetch();
+ // Iterating over the session handlers to see if there are
acknowledgements to be sent for partitions
+ // which are no longer part of the current subscription.
+ sessionHandlers.forEach((nodeId, sessionHandler) -> {
+ Node node = cluster.nodeById(nodeId);
+ if (node != null) {
+ if (nodesWithPendingRequests.contains(node.id())) {
+ log.trace("Skipping fetch because previous fetch request
to {} has not been processed", node.id());
+ } else {
+ for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
+ if (!fetchedPartitions.contains(tip)) {
+ Acknowledgements acknowledgementsToSend =
fetchAcknowledgementsMap.get(tip);
+ if (acknowledgementsToSend != null) {
+
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+ }
+ sessionHandler.addPartitionToFetch(tip,
acknowledgementsToSend);
+ partitionsToForgetMap.putIfAbsent(node, new
ArrayList<>());
+ partitionsToForgetMap.get(node).add(tip);
+
+ forgottenTopicNames.putIfAbsent(tip.topicId(),
tip.topic());
+ fetchedPartitions.add(tip);
+ log.debug("Added fetch request for partition {} to
node {}", tip, node.id());
+ }
+ }
+ }
+ }
+ });
+
Map<Node, ShareFetchRequest.Builder> builderMap = new
LinkedHashMap<>();
for (Map.Entry<Node, ShareSessionHandler> entry :
handlerMap.entrySet()) {
builderMap.put(entry.getKey(),
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+ Node node = entry.getKey();
+ ShareFetchRequestData data = builderMap.get(entry.getKey()).data();
+ if (partitionsToForgetMap.containsKey(node)) {
+ if (data.forgottenTopicsData() == null) {
+ data.setForgottenTopicsData(new ArrayList<>());
+ }
+
ShareFetchRequest.Builder.updateForgottenData(partitionsToForgetMap.get(node),
data);
Review Comment:
Right okay, I have updated this now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]