junrao commented on code in PR #18232:
URL: https://github.com/apache/kafka/pull/18232#discussion_r1888991498


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java:
##########
@@ -162,16 +170,22 @@ void maybeUpdateAssignment(SubscriptionState 
subscription) {
                     metrics.removeSensor(partitionRecordsLagMetricName(tp));
                     metrics.removeSensor(partitionRecordsLeadMetricName(tp));
                     
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
+                    // Remove deprecated metrics.
+                    
metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp)));
+                    
metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp)));
+                    
metrics.removeMetric(partitionPreferredReadReplicaMetricNameDeprecated(tp));

Review Comment:
   partitionPreferredReadReplicaMetricNameDeprecated => 
deprecatedPartitionPreferredReadReplicaMetricName to be consistent with other 
names?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -117,20 +119,40 @@ public void testBytesFetched() {
     @Test
     public void testBytesFetchedTopic() {
         String topicName1 = TOPIC_NAME;
-        String topicName2 = "another-topic";
-        Map<String, String> tags1 = topicTags(topicName1);
-        Map<String, String> tags2 = topicTags(topicName2);
+        String topicName2 = "another.topic";
+        Map<String, String> tags1 = Map.of("topic", topicName1);
+        Map<String, String> tags2 = Map.of("topic", topicName2);
+        Map<String, String> deprecatedTags = topicTags(topicName2);
+        int initialMetricsSize = metrics.metrics().size();
 
         metricsManager.recordBytesFetched(topicName1, 2);
+        // 4 new metrics shall be registered.
+        assertEquals(4, metrics.metrics().size() - initialMetricsSize);
         metricsManager.recordBytesFetched(topicName2, 1);
+        // Another 8 metrics gets registered as deprecated metrics should be 
reported for topicName2.

Review Comment:
   gets => get



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java:
##########
@@ -181,6 +195,67 @@ void maybeUpdateAssignment(SubscriptionState subscription) 
{
         }
     }
 
+    @Deprecated // To be removed in Kafka 5.0 release.
+    private void maybeRecordDeprecatedBytesFetched(String name, String topic, 
int bytes) {

Review Comment:
   It would be useful to follow the approach used in 
https://github.com/apache/kafka/pull/11302/files to add "deprecated" in the 
description of the metric name and update the ops doc about the deprecation.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -145,46 +167,209 @@ public void testRecordsFetched() {
     @Test
     public void testRecordsFetchedTopic() {
         String topicName1 = TOPIC_NAME;
-        String topicName2 = "another-topic";
-        Map<String, String> tags1 = topicTags(topicName1);
-        Map<String, String> tags2 = topicTags(topicName2);
+        String topicName2 = "another.topic";
+        Map<String, String> tags1 = Map.of("topic", topicName1);
+        Map<String, String> tags2 = Map.of("topic", topicName2);
+        Map<String, String> deprecatedTags = topicTags(topicName2);
+        int initialMetricsSize = metrics.metrics().size();
 
         metricsManager.recordRecordsFetched(topicName1, 2);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
         metricsManager.recordRecordsFetched(topicName2, 1);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for topicName2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
         time.sleep(metrics.config().timeWindowMs() + 1);
         metricsManager.recordRecordsFetched(topicName1, 10);
         metricsManager.recordRecordsFetched(topicName2, 5);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for topicName1.
         assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags1), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
tags1) > 0);
+        assertEquals(12, 
metricValue(metricsRegistry.topicRecordsConsumedTotal, tags1), EPSILON);
+        // Validate metrics for topicName2.
         assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags2), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
tags2) > 0);
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
deprecatedTags), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
deprecatedTags) > 0);
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, 
deprecatedTags), EPSILON);
     }
 
     @Test
     public void testPartitionLag() {
-        Map<String, String> tags = topicPartitionTags(TP);
-        metricsManager.recordPartitionLag(TP, 14);
-        metricsManager.recordPartitionLag(TP, 8);
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLag(tp1, 14);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLag(tp1, 8);
         time.sleep(metrics.config().timeWindowMs() + 1);
-        metricsManager.recordPartitionLag(TP, 5);
+        metricsManager.recordPartitionLag(tp1, 5);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp1.
         assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
-        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags), EPSILON);
-        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags), EPSILON);
-        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags), EPSILON);
+        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags1), EPSILON);
+        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags1), EPSILON);
+        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags1), EPSILON);
+
+        metricsManager.recordPartitionLag(tp2, 7);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        metricsManager.recordPartitionLag(tp2, 3);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLag(tp2, 2);
+
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp2.
+        assertEquals(7, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+        assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, 
tags2), EPSILON);
+        assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags2), EPSILON);
+        assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, 
deprecatedTags), EPSILON);
+        assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax, 
deprecatedTags), EPSILON);
+        assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg, 
deprecatedTags), EPSILON);
     }
 
     @Test
     public void testPartitionLead() {
-        Map<String, String> tags = topicPartitionTags(TP);
-        metricsManager.recordPartitionLead(TP, 15);
-        metricsManager.recordPartitionLead(TP, 11);
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLead(tp1, 15);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLead(tp1, 11);
         time.sleep(metrics.config().timeWindowMs() + 1);
-        metricsManager.recordPartitionLead(TP, 13);
+        metricsManager.recordPartitionLead(tp1, 13);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp1.
         assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
-        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags), EPSILON);
-        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags), EPSILON);
-        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags1), EPSILON);
+        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags1), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags1), EPSILON);
+
+        metricsManager.recordPartitionLead(tp2, 18);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLead(tp2, 12);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLead(tp2, 15);
+
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp2.
+        assertEquals(12, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead, 
tags2), EPSILON);
+        assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags2), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead, 
deprecatedTags), EPSILON);
+        assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin, 
deprecatedTags), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
deprecatedTags), EPSILON);
+    }
+
+    @Test
+    public void testMaybeUpdateAssignment() {
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+        TopicPartition tp3 = new TopicPartition("another.topic", 1);
+        int initialMetricsSize = metrics.metrics().size();
+
+        SubscriptionState subscriptionState = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        subscriptionState.assignFromUser(Set.of(tp1));
+
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // 1 new metrics shall be registered.
+        assertEquals(1, metrics.metrics().size() - initialMetricsSize);
+
+        subscriptionState.assignFromUser(Set.of(tp1, tp2));
+        subscriptionState.updatePreferredReadReplica(tp2, 1, () -> 0L);
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Another 2 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        // Validate preferred read replica metrics.
+        assertEquals(-1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags1), 
EPSILON);
+        assertEquals(1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags2), 
EPSILON);
+        assertEquals(1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, 
deprecatedTags), EPSILON);
+
+        // Remove tp2 from subscription set.
+        subscriptionState.assignFromUser(Set.of(tp1, tp3));
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Metrics count shall remain same as tp2 should be removed and tp3 
gets added.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        // Remove all partitions.
+        subscriptionState.assignFromUser(Set.of());
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Metrics count shall be same as initial count as all new metrics 
shall be removed.
+        assertEquals(initialMetricsSize, metrics.metrics().size());
+    }
+
+    @Test
+    public void testMaybeUpdateAssignmentWithAdditionalRegisteredMetrics() {
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+        TopicPartition tp3 = new TopicPartition("another.topic", 1);
+
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLag(tp1, 14);
+        metricsManager.recordPartitionLead(tp1, 11);
+        metricsManager.recordPartitionLag(tp2, 5);
+        metricsManager.recordPartitionLead(tp2, 1);
+        metricsManager.recordPartitionLag(tp3, 4);
+        metricsManager.recordPartitionLead(tp3, 2);
+
+        int additionalRegisteredMetricsSize = metrics.metrics().size();
+
+        SubscriptionState subscriptionState = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        subscriptionState.assignFromUser(Set.of(tp1, tp2, tp3));
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+
+        // 5 new metrics shall be registered.
+        assertEquals(5, metrics.metrics().size() - 
additionalRegisteredMetricsSize);
+
+        // Remove 1 topic which has deprecated metrics as well.

Review Comment:
   1 topic => 1 partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to