AndrewJSchofield commented on code in PR #18444: URL: https://github.com/apache/kafka/pull/18444#discussion_r1919821153
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -363,24 +387,31 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part } }); - return mapAcknowledgementFutures(futuresMap); + return mapAcknowledgementFutures(futuresMap, Optional.empty()); } - private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap) { + private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures( + Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap, + Optional<Consumer<Collection<String>>> metricsHandler Review Comment: This really is the `failedMetricsHandler` ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -695,6 +735,21 @@ private static void removeSharePartitionFromCache( } } + /** + * The handler to update the failed share acknowledge request metrics. + * + * @return A Consumer that updates the failed share acknowledge request metrics. + */ + private Consumer<Collection<String>> failedShareAcknowledgeMetricsHandler() { Review Comment: I think this `Collection` is actually a `Set` in practice. Given that you're taking steps to ensure that there are no duplicates in the collection, I personally would make the signature use a `Set` also. ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -302,7 +320,13 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part } }); - return mapAcknowledgementFutures(futures); + // Update the metrics for the topics for which we have received an acknowledgement. + topics.forEach(topic -> { + brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark(); Review Comment: I believe it's called for both. I don't agree that they need to be separated out. There are two equivalent ways of acknowledging delivery and I would include both in the same metrics. ########## server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java: ########## @@ -202,11 +209,18 @@ public synchronized Set<TopicIdPartition> filterErroneousTopicPartitions(Set<Top private synchronized void addErroneousToResponse(Map<TopicIdPartition, PartitionData> response) { if (erroneous != null) { + // Track the failed topics for metrics. + Set<String> erroneousTopics = new HashSet<>(); erroneous.forEach((topicIdPartition, throwable) -> { + erroneousTopics.add(topicIdPartition.topic()); response.put(topicIdPartition, new PartitionData() .setErrorCode(Errors.forException(throwable).code()) Review Comment: I would expect the partition index to be initialised in the `PartitionData` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org