AndrewJSchofield commented on code in PR #18232:
URL: https://github.com/apache/kafka/pull/18232#discussion_r1890416696


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -145,46 +167,209 @@ public void testRecordsFetched() {
     @Test
     public void testRecordsFetchedTopic() {
         String topicName1 = TOPIC_NAME;
-        String topicName2 = "another-topic";
-        Map<String, String> tags1 = topicTags(topicName1);
-        Map<String, String> tags2 = topicTags(topicName2);
+        String topicName2 = "another.topic";
+        Map<String, String> tags1 = Map.of("topic", topicName1);
+        Map<String, String> tags2 = Map.of("topic", topicName2);
+        Map<String, String> deprecatedTags = topicTags(topicName2);
+        int initialMetricsSize = metrics.metrics().size();
 
         metricsManager.recordRecordsFetched(topicName1, 2);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
         metricsManager.recordRecordsFetched(topicName2, 1);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for topicName2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
         time.sleep(metrics.config().timeWindowMs() + 1);
         metricsManager.recordRecordsFetched(topicName1, 10);
         metricsManager.recordRecordsFetched(topicName2, 5);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for topicName1.
         assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags1), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
tags1) > 0);
+        assertEquals(12, 
metricValue(metricsRegistry.topicRecordsConsumedTotal, tags1), EPSILON);
+        // Validate metrics for topicName2.
         assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags2), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
tags2) > 0);
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
deprecatedTags), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
deprecatedTags) > 0);
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, 
deprecatedTags), EPSILON);
     }
 
     @Test
     public void testPartitionLag() {
-        Map<String, String> tags = topicPartitionTags(TP);
-        metricsManager.recordPartitionLag(TP, 14);
-        metricsManager.recordPartitionLag(TP, 8);
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLag(tp1, 14);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLag(tp1, 8);
         time.sleep(metrics.config().timeWindowMs() + 1);
-        metricsManager.recordPartitionLag(TP, 5);
+        metricsManager.recordPartitionLag(tp1, 5);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp1.
         assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
-        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags), EPSILON);
-        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags), EPSILON);
-        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags), EPSILON);
+        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags1), EPSILON);
+        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags1), EPSILON);
+        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags1), EPSILON);
+
+        metricsManager.recordPartitionLag(tp2, 7);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        metricsManager.recordPartitionLag(tp2, 3);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLag(tp2, 2);
+
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp2.
+        assertEquals(7, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+        assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, 
tags2), EPSILON);
+        assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags2), EPSILON);
+        assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, 
deprecatedTags), EPSILON);
+        assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax, 
deprecatedTags), EPSILON);
+        assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg, 
deprecatedTags), EPSILON);
     }
 
     @Test
     public void testPartitionLead() {
-        Map<String, String> tags = topicPartitionTags(TP);
-        metricsManager.recordPartitionLead(TP, 15);
-        metricsManager.recordPartitionLead(TP, 11);
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLead(tp1, 15);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLead(tp1, 11);
         time.sleep(metrics.config().timeWindowMs() + 1);
-        metricsManager.recordPartitionLead(TP, 13);
+        metricsManager.recordPartitionLead(tp1, 13);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp1.
         assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
-        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags), EPSILON);
-        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags), EPSILON);
-        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags1), EPSILON);
+        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags1), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags1), EPSILON);
+
+        metricsManager.recordPartitionLead(tp2, 18);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLead(tp2, 12);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLead(tp2, 15);
+
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp2.
+        assertEquals(12, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead, 
tags2), EPSILON);
+        assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags2), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead, 
deprecatedTags), EPSILON);
+        assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin, 
deprecatedTags), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
deprecatedTags), EPSILON);
+    }
+
+    @Test
+    public void testMaybeUpdateAssignment() {
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+        TopicPartition tp3 = new TopicPartition("another.topic", 1);
+        int initialMetricsSize = metrics.metrics().size();
+
+        SubscriptionState subscriptionState = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        subscriptionState.assignFromUser(Set.of(tp1));
+
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // 1 new metrics shall be registered.
+        assertEquals(1, metrics.metrics().size() - initialMetricsSize);
+
+        subscriptionState.assignFromUser(Set.of(tp1, tp2));
+        subscriptionState.updatePreferredReadReplica(tp2, 1, () -> 0L);
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Another 2 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        // Validate preferred read replica metrics.
+        assertEquals(-1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags1), 
EPSILON);
+        assertEquals(1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags2), 
EPSILON);
+        assertEquals(1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, 
deprecatedTags), EPSILON);
+
+        // Remove tp2 from subscription set.
+        subscriptionState.assignFromUser(Set.of(tp1, tp3));
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Metrics count shall remain same as tp2 should be removed and tp3 
gets added.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        // Remove all partitions.
+        subscriptionState.assignFromUser(Set.of());
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Metrics count shall be same as initial count as all new metrics 
shall be removed.
+        assertEquals(initialMetricsSize, metrics.metrics().size());
+    }
+
+    @Test
+    public void testMaybeUpdateAssignmentWithAdditionalRegisteredMetrics() {
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+        TopicPartition tp3 = new TopicPartition("another.topic", 1);
+
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLag(tp1, 14);
+        metricsManager.recordPartitionLead(tp1, 11);
+        metricsManager.recordPartitionLag(tp2, 5);
+        metricsManager.recordPartitionLead(tp2, 1);
+        metricsManager.recordPartitionLag(tp3, 4);
+        metricsManager.recordPartitionLead(tp3, 2);
+
+        int additionalRegisteredMetricsSize = metrics.metrics().size();

Review Comment:
   Wouldn't it be sensible to assert the number of additional registered 
metrics too?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -32,18 +33,19 @@
 import org.junit.jupiter.api.Test;
 
 import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags;
 import static 
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class FetchMetricsManagerTest {

Review Comment:
   Personally, I would put `@SuppressWarnings("deprecation")` on this class 
because it is knowingly using deprecated methods.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -145,46 +167,209 @@ public void testRecordsFetched() {
     @Test
     public void testRecordsFetchedTopic() {
         String topicName1 = TOPIC_NAME;
-        String topicName2 = "another-topic";
-        Map<String, String> tags1 = topicTags(topicName1);
-        Map<String, String> tags2 = topicTags(topicName2);
+        String topicName2 = "another.topic";
+        Map<String, String> tags1 = Map.of("topic", topicName1);
+        Map<String, String> tags2 = Map.of("topic", topicName2);
+        Map<String, String> deprecatedTags = topicTags(topicName2);
+        int initialMetricsSize = metrics.metrics().size();
 
         metricsManager.recordRecordsFetched(topicName1, 2);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
         metricsManager.recordRecordsFetched(topicName2, 1);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for topicName2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
         time.sleep(metrics.config().timeWindowMs() + 1);
         metricsManager.recordRecordsFetched(topicName1, 10);
         metricsManager.recordRecordsFetched(topicName2, 5);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for topicName1.
         assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags1), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
tags1) > 0);
+        assertEquals(12, 
metricValue(metricsRegistry.topicRecordsConsumedTotal, tags1), EPSILON);
+        // Validate metrics for topicName2.
         assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags2), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
tags2) > 0);
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
deprecatedTags), EPSILON);
+        assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate, 
deprecatedTags) > 0);
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, 
deprecatedTags), EPSILON);
     }
 
     @Test
     public void testPartitionLag() {
-        Map<String, String> tags = topicPartitionTags(TP);
-        metricsManager.recordPartitionLag(TP, 14);
-        metricsManager.recordPartitionLag(TP, 8);
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLag(tp1, 14);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLag(tp1, 8);
         time.sleep(metrics.config().timeWindowMs() + 1);
-        metricsManager.recordPartitionLag(TP, 5);
+        metricsManager.recordPartitionLag(tp1, 5);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp1.
         assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
-        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags), EPSILON);
-        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags), EPSILON);
-        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags), EPSILON);
+        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags1), EPSILON);
+        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags1), EPSILON);
+        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags1), EPSILON);
+
+        metricsManager.recordPartitionLag(tp2, 7);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        metricsManager.recordPartitionLag(tp2, 3);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLag(tp2, 2);
+
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp2.
+        assertEquals(7, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+        assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, 
tags2), EPSILON);
+        assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags2), EPSILON);
+        assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, 
deprecatedTags), EPSILON);
+        assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax, 
deprecatedTags), EPSILON);
+        assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg, 
deprecatedTags), EPSILON);
     }
 
     @Test
     public void testPartitionLead() {
-        Map<String, String> tags = topicPartitionTags(TP);
-        metricsManager.recordPartitionLead(TP, 15);
-        metricsManager.recordPartitionLead(TP, 11);
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLead(tp1, 15);
+        // 3 new metrics shall be registered.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLead(tp1, 11);
         time.sleep(metrics.config().timeWindowMs() + 1);
-        metricsManager.recordPartitionLead(TP, 13);
+        metricsManager.recordPartitionLead(tp1, 13);
 
+        // Subsequent calls should not register new metrics.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp1.
         assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
-        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags), EPSILON);
-        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags), EPSILON);
-        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags1), EPSILON);
+        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags1), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags1), EPSILON);
+
+        metricsManager.recordPartitionLead(tp2, 18);
+        // Another 6 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
+        metricsManager.recordPartitionLead(tp2, 12);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLead(tp2, 15);
+
+        // Subsequent calls should not register new metrics.
+        assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+        // Validate metrics for tp2.
+        assertEquals(12, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead, 
tags2), EPSILON);
+        assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags2), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags2), EPSILON);
+        // Validate metrics for deprecated topic.
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead, 
deprecatedTags), EPSILON);
+        assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin, 
deprecatedTags), EPSILON);
+        assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
deprecatedTags), EPSILON);
+    }
+
+    @Test
+    public void testMaybeUpdateAssignment() {
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+        TopicPartition tp3 = new TopicPartition("another.topic", 1);
+        int initialMetricsSize = metrics.metrics().size();
+
+        SubscriptionState subscriptionState = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        subscriptionState.assignFromUser(Set.of(tp1));
+
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // 1 new metrics shall be registered.
+        assertEquals(1, metrics.metrics().size() - initialMetricsSize);
+
+        subscriptionState.assignFromUser(Set.of(tp1, tp2));
+        subscriptionState.updatePreferredReadReplica(tp2, 1, () -> 0L);
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Another 2 metrics gets registered as deprecated metrics should be 
reported for tp2.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition", 
String.valueOf(tp1.partition()));
+        Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition", 
String.valueOf(tp2.partition()));
+        Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+        // Validate preferred read replica metrics.
+        assertEquals(-1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags1), 
EPSILON);
+        assertEquals(1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags2), 
EPSILON);
+        assertEquals(1, 
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, 
deprecatedTags), EPSILON);
+
+        // Remove tp2 from subscription set.
+        subscriptionState.assignFromUser(Set.of(tp1, tp3));
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Metrics count shall remain same as tp2 should be removed and tp3 
gets added.
+        assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+        // Remove all partitions.
+        subscriptionState.assignFromUser(Set.of());
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // Metrics count shall be same as initial count as all new metrics 
shall be removed.
+        assertEquals(initialMetricsSize, metrics.metrics().size());
+    }
+
+    @Test
+    public void testMaybeUpdateAssignmentWithAdditionalRegisteredMetrics() {
+        TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+        TopicPartition tp2 = new TopicPartition("another.topic", 0);
+        TopicPartition tp3 = new TopicPartition("another.topic", 1);
+
+        int initialMetricsSize = metrics.metrics().size();
+
+        metricsManager.recordPartitionLag(tp1, 14);
+        metricsManager.recordPartitionLead(tp1, 11);
+        metricsManager.recordPartitionLag(tp2, 5);
+        metricsManager.recordPartitionLead(tp2, 1);
+        metricsManager.recordPartitionLag(tp3, 4);
+        metricsManager.recordPartitionLead(tp3, 2);
+
+        int additionalRegisteredMetricsSize = metrics.metrics().size();
+
+        SubscriptionState subscriptionState = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        subscriptionState.assignFromUser(Set.of(tp1, tp2, tp3));
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+
+        // 5 new metrics shall be registered.
+        assertEquals(5, metrics.metrics().size() - 
additionalRegisteredMetricsSize);
+
+        // Remove 1 topic which has deprecated metrics as well.
+        subscriptionState.assignFromUser(Set.of(tp1, tp2));
+        metricsManager.maybeUpdateAssignment(subscriptionState);
+        // For tp2, 14 metrics will be unregistered. 3 for partition lag, 3 
for partition lead, 1 for
+        // preferred read replica and similarly 7 deprecated metrics.
+        assertEquals(-9, metrics.metrics().size() - 
additionalRegisteredMetricsSize);

Review Comment:
   This -9 is needlessly obscure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to