AndrewJSchofield commented on code in PR #18444:
URL: https://github.com/apache/kafka/pull/18444#discussion_r1919821153


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -363,24 +387,31 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
             }
         });
 
-        return mapAcknowledgementFutures(futuresMap);
+        return mapAcknowledgementFutures(futuresMap, Optional.empty());
     }
 
-    private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData>> 
mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> 
futuresMap) {
+    private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(
+        Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap,
+        Optional<Consumer<Collection<String>>> metricsHandler

Review Comment:
   This really is the `failedMetricsHandler`



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -695,6 +735,21 @@ private static void removeSharePartitionFromCache(
         }
     }
 
+    /**
+     * The handler to update the failed share acknowledge request metrics.
+     *
+     * @return A Consumer that updates the failed share acknowledge request 
metrics.
+     */
+    private Consumer<Collection<String>> 
failedShareAcknowledgeMetricsHandler() {

Review Comment:
   I think this `Collection` is actually a `Set` in practice. Given that you're 
taking steps to ensure that there are no duplicates in the collection, I 
personally would make the signature use a `Set` also.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -302,7 +320,13 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
             }
         });
 
-        return mapAcknowledgementFutures(futures);
+        // Update the metrics for the topics for which we have received an 
acknowledgement.
+        topics.forEach(topic -> {
+            
brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark();

Review Comment:
   I believe it's called for both. I don't agree that they need to be separated 
out. There are two equivalent ways of acknowledging delivery and I would 
include both in the same metrics.



##########
server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java:
##########
@@ -202,11 +209,18 @@ public synchronized Set<TopicIdPartition> 
filterErroneousTopicPartitions(Set<Top
 
     private synchronized void addErroneousToResponse(Map<TopicIdPartition, 
PartitionData> response) {
         if (erroneous != null) {
+            // Track the failed topics for metrics.
+            Set<String> erroneousTopics = new HashSet<>();
             erroneous.forEach((topicIdPartition, throwable) -> {
+                erroneousTopics.add(topicIdPartition.topic());
                 response.put(topicIdPartition, new PartitionData()
                     .setErrorCode(Errors.forException(throwable).code())

Review Comment:
   I would expect the partition index to be initialised in the `PartitionData` 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to