AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934149229
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
k -> sessionHandlers.computeIfAbsent(node.id(), n ->
new ShareSessionHandler(logContext, n, memberId)));
TopicIdPartition tip = new TopicIdPartition(topicId,
partition);
- Acknowledgements acknowledgementsToSend =
fetchAcknowledgementsToSend.remove(tip);
- if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
- fetchAcknowledgementsInFlight.put(tip,
acknowledgementsToSend);
+ Acknowledgements acknowledgementsToSend = null;
+ Map<TopicIdPartition, Acknowledgements>
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());
+ if (nodeAcknowledgementsMap != null) {
+ acknowledgementsToSend =
nodeAcknowledgementsMap.remove(tip);
+ if (acknowledgementsToSend != null) {
+
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new
HashMap<>()).put(tip, acknowledgementsToSend);
+ }
}
+
handler.addPartitionToFetch(tip, acknowledgementsToSend);
- fetchedPartitions.add(tip);
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(),
tip.partition()), tip.topic());
log.debug("Added fetch request for partition {} to node {}",
tip, node.id());
}
}
- // Map storing the list of partitions to forget in the upcoming
request.
- Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new
HashMap<>();
+
+ // Iterate over the session handlers to see if there are
acknowledgements to be sent for partitions
+ // which are no longer part of the current subscription, or whose
records were fetched from a
+ // previous leader.
Cluster cluster = metadata.fetch();
- // Iterating over the session handlers to see if there are
acknowledgements to be sent for partitions
- // which are no longer part of the current subscription.
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
if (nodesWithPendingRequests.contains(node.id())) {
- log.trace("Skipping fetch because previous fetch request
to {} has not been processed", node.id());
+ log.trace("Skipping fetch because previous fetch request
to {} has not been processed", nodeId);
} else {
- for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
- if (!fetchedPartitions.contains(tip)) {
- Acknowledgements acknowledgementsToSend =
fetchAcknowledgementsToSend.remove(tip);
-
- if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
- fetchAcknowledgementsInFlight.put(tip,
acknowledgementsToSend);
-
- sessionHandler.addPartitionToFetch(tip,
acknowledgementsToSend);
- handlerMap.put(node, sessionHandler);
-
- partitionsToForgetMap.putIfAbsent(node, new
ArrayList<>());
- partitionsToForgetMap.get(node).add(tip);
-
- topicNamesMap.putIfAbsent(new
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
- fetchedPartitions.add(tip);
- log.debug("Added fetch request for previously
subscribed partition {} to node {}", tip, node.id());
- }
- }
+ Map<TopicIdPartition, Acknowledgements>
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+ if (nodeAcksFromFetchMap != null) {
+ nodeAcksFromFetchMap.forEach((tip, acks) -> {
+
metricsManager.recordAcknowledgementSent(acks.size());
Review Comment:
Yes, good point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]