AndrewJSchofield commented on code in PR #18232:
URL: https://github.com/apache/kafka/pull/18232#discussion_r1890416696
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -145,46 +167,209 @@ public void testRecordsFetched() {
@Test
public void testRecordsFetchedTopic() {
String topicName1 = TOPIC_NAME;
- String topicName2 = "another-topic";
- Map<String, String> tags1 = topicTags(topicName1);
- Map<String, String> tags2 = topicTags(topicName2);
+ String topicName2 = "another.topic";
+ Map<String, String> tags1 = Map.of("topic", topicName1);
+ Map<String, String> tags2 = Map.of("topic", topicName2);
+ Map<String, String> deprecatedTags = topicTags(topicName2);
+ int initialMetricsSize = metrics.metrics().size();
metricsManager.recordRecordsFetched(topicName1, 2);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
metricsManager.recordRecordsFetched(topicName2, 1);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for topicName2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
time.sleep(metrics.config().timeWindowMs() + 1);
metricsManager.recordRecordsFetched(topicName1, 10);
metricsManager.recordRecordsFetched(topicName2, 5);
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for topicName1.
assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags1), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
tags1) > 0);
+ assertEquals(12,
metricValue(metricsRegistry.topicRecordsConsumedTotal, tags1), EPSILON);
+ // Validate metrics for topicName2.
assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags2), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
tags2) > 0);
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
deprecatedTags), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
deprecatedTags) > 0);
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal,
deprecatedTags), EPSILON);
}
@Test
public void testPartitionLag() {
- Map<String, String> tags = topicPartitionTags(TP);
- metricsManager.recordPartitionLag(TP, 14);
- metricsManager.recordPartitionLag(TP, 8);
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLag(tp1, 14);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLag(tp1, 8);
time.sleep(metrics.config().timeWindowMs() + 1);
- metricsManager.recordPartitionLag(TP, 5);
+ metricsManager.recordPartitionLag(tp1, 5);
+ // Subsequent calls should not register new metrics.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp1.
assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
- assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags), EPSILON);
- assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags), EPSILON);
- assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags), EPSILON);
+ assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags1), EPSILON);
+ assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags1), EPSILON);
+ assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags1), EPSILON);
+
+ metricsManager.recordPartitionLag(tp2, 7);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ metricsManager.recordPartitionLag(tp2, 3);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLag(tp2, 2);
+
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp2.
+ assertEquals(7, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+ assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag,
tags2), EPSILON);
+ assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax,
tags2), EPSILON);
+ assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag,
deprecatedTags), EPSILON);
+ assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax,
deprecatedTags), EPSILON);
+ assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg,
deprecatedTags), EPSILON);
}
@Test
public void testPartitionLead() {
- Map<String, String> tags = topicPartitionTags(TP);
- metricsManager.recordPartitionLead(TP, 15);
- metricsManager.recordPartitionLead(TP, 11);
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLead(tp1, 15);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLead(tp1, 11);
time.sleep(metrics.config().timeWindowMs() + 1);
- metricsManager.recordPartitionLead(TP, 13);
+ metricsManager.recordPartitionLead(tp1, 13);
+ // Subsequent calls should not register new metrics.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp1.
assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
- assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags), EPSILON);
- assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags), EPSILON);
- assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags1), EPSILON);
+ assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags1), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags1), EPSILON);
+
+ metricsManager.recordPartitionLead(tp2, 18);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLead(tp2, 12);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLead(tp2, 15);
+
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp2.
+ assertEquals(12, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead,
tags2), EPSILON);
+ assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags2), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead,
deprecatedTags), EPSILON);
+ assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin,
deprecatedTags), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg,
deprecatedTags), EPSILON);
+ }
+
+ @Test
+ public void testMaybeUpdateAssignment() {
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+ TopicPartition tp3 = new TopicPartition("another.topic", 1);
+ int initialMetricsSize = metrics.metrics().size();
+
+ SubscriptionState subscriptionState = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ subscriptionState.assignFromUser(Set.of(tp1));
+
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // 1 new metrics shall be registered.
+ assertEquals(1, metrics.metrics().size() - initialMetricsSize);
+
+ subscriptionState.assignFromUser(Set.of(tp1, tp2));
+ subscriptionState.updatePreferredReadReplica(tp2, 1, () -> 0L);
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Another 2 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ // Validate preferred read replica metrics.
+ assertEquals(-1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags1),
EPSILON);
+ assertEquals(1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags2),
EPSILON);
+ assertEquals(1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica,
deprecatedTags), EPSILON);
+
+ // Remove tp2 from subscription set.
+ subscriptionState.assignFromUser(Set.of(tp1, tp3));
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Metrics count shall remain same as tp2 should be removed and tp3
gets added.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ // Remove all partitions.
+ subscriptionState.assignFromUser(Set.of());
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Metrics count shall be same as initial count as all new metrics
shall be removed.
+ assertEquals(initialMetricsSize, metrics.metrics().size());
+ }
+
+ @Test
+ public void testMaybeUpdateAssignmentWithAdditionalRegisteredMetrics() {
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+ TopicPartition tp3 = new TopicPartition("another.topic", 1);
+
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLag(tp1, 14);
+ metricsManager.recordPartitionLead(tp1, 11);
+ metricsManager.recordPartitionLag(tp2, 5);
+ metricsManager.recordPartitionLead(tp2, 1);
+ metricsManager.recordPartitionLag(tp3, 4);
+ metricsManager.recordPartitionLead(tp3, 2);
+
+ int additionalRegisteredMetricsSize = metrics.metrics().size();
Review Comment:
Wouldn't it be sensible to assert the number of additional registered
metrics too?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -32,18 +33,19 @@
import org.junit.jupiter.api.Test;
import java.util.Map;
+import java.util.Set;
import static
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags;
import static
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class FetchMetricsManagerTest {
Review Comment:
Personally, I would put `@SuppressWarnings("deprecation")` on this class
because it is knowingly using deprecated methods.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -145,46 +167,209 @@ public void testRecordsFetched() {
@Test
public void testRecordsFetchedTopic() {
String topicName1 = TOPIC_NAME;
- String topicName2 = "another-topic";
- Map<String, String> tags1 = topicTags(topicName1);
- Map<String, String> tags2 = topicTags(topicName2);
+ String topicName2 = "another.topic";
+ Map<String, String> tags1 = Map.of("topic", topicName1);
+ Map<String, String> tags2 = Map.of("topic", topicName2);
+ Map<String, String> deprecatedTags = topicTags(topicName2);
+ int initialMetricsSize = metrics.metrics().size();
metricsManager.recordRecordsFetched(topicName1, 2);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
metricsManager.recordRecordsFetched(topicName2, 1);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for topicName2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
time.sleep(metrics.config().timeWindowMs() + 1);
metricsManager.recordRecordsFetched(topicName1, 10);
metricsManager.recordRecordsFetched(topicName2, 5);
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for topicName1.
assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags1), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
tags1) > 0);
+ assertEquals(12,
metricValue(metricsRegistry.topicRecordsConsumedTotal, tags1), EPSILON);
+ // Validate metrics for topicName2.
assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags2), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
tags2) > 0);
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
deprecatedTags), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
deprecatedTags) > 0);
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal,
deprecatedTags), EPSILON);
}
@Test
public void testPartitionLag() {
- Map<String, String> tags = topicPartitionTags(TP);
- metricsManager.recordPartitionLag(TP, 14);
- metricsManager.recordPartitionLag(TP, 8);
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLag(tp1, 14);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLag(tp1, 8);
time.sleep(metrics.config().timeWindowMs() + 1);
- metricsManager.recordPartitionLag(TP, 5);
+ metricsManager.recordPartitionLag(tp1, 5);
+ // Subsequent calls should not register new metrics.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp1.
assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
- assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags), EPSILON);
- assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags), EPSILON);
- assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags), EPSILON);
+ assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags1), EPSILON);
+ assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags1), EPSILON);
+ assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags1), EPSILON);
+
+ metricsManager.recordPartitionLag(tp2, 7);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ metricsManager.recordPartitionLag(tp2, 3);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLag(tp2, 2);
+
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp2.
+ assertEquals(7, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+ assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag,
tags2), EPSILON);
+ assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax,
tags2), EPSILON);
+ assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag,
deprecatedTags), EPSILON);
+ assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax,
deprecatedTags), EPSILON);
+ assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg,
deprecatedTags), EPSILON);
}
@Test
public void testPartitionLead() {
- Map<String, String> tags = topicPartitionTags(TP);
- metricsManager.recordPartitionLead(TP, 15);
- metricsManager.recordPartitionLead(TP, 11);
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLead(tp1, 15);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLead(tp1, 11);
time.sleep(metrics.config().timeWindowMs() + 1);
- metricsManager.recordPartitionLead(TP, 13);
+ metricsManager.recordPartitionLead(tp1, 13);
+ // Subsequent calls should not register new metrics.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp1.
assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
- assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags), EPSILON);
- assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags), EPSILON);
- assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags1), EPSILON);
+ assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags1), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags1), EPSILON);
+
+ metricsManager.recordPartitionLead(tp2, 18);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLead(tp2, 12);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLead(tp2, 15);
+
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp2.
+ assertEquals(12, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead,
tags2), EPSILON);
+ assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags2), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead,
deprecatedTags), EPSILON);
+ assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin,
deprecatedTags), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg,
deprecatedTags), EPSILON);
+ }
+
+ @Test
+ public void testMaybeUpdateAssignment() {
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+ TopicPartition tp3 = new TopicPartition("another.topic", 1);
+ int initialMetricsSize = metrics.metrics().size();
+
+ SubscriptionState subscriptionState = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ subscriptionState.assignFromUser(Set.of(tp1));
+
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // 1 new metrics shall be registered.
+ assertEquals(1, metrics.metrics().size() - initialMetricsSize);
+
+ subscriptionState.assignFromUser(Set.of(tp1, tp2));
+ subscriptionState.updatePreferredReadReplica(tp2, 1, () -> 0L);
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Another 2 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ // Validate preferred read replica metrics.
+ assertEquals(-1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags1),
EPSILON);
+ assertEquals(1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags2),
EPSILON);
+ assertEquals(1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica,
deprecatedTags), EPSILON);
+
+ // Remove tp2 from subscription set.
+ subscriptionState.assignFromUser(Set.of(tp1, tp3));
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Metrics count shall remain same as tp2 should be removed and tp3
gets added.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ // Remove all partitions.
+ subscriptionState.assignFromUser(Set.of());
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Metrics count shall be same as initial count as all new metrics
shall be removed.
+ assertEquals(initialMetricsSize, metrics.metrics().size());
+ }
+
+ @Test
+ public void testMaybeUpdateAssignmentWithAdditionalRegisteredMetrics() {
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+ TopicPartition tp3 = new TopicPartition("another.topic", 1);
+
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLag(tp1, 14);
+ metricsManager.recordPartitionLead(tp1, 11);
+ metricsManager.recordPartitionLag(tp2, 5);
+ metricsManager.recordPartitionLead(tp2, 1);
+ metricsManager.recordPartitionLag(tp3, 4);
+ metricsManager.recordPartitionLead(tp3, 2);
+
+ int additionalRegisteredMetricsSize = metrics.metrics().size();
+
+ SubscriptionState subscriptionState = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ subscriptionState.assignFromUser(Set.of(tp1, tp2, tp3));
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+
+ // 5 new metrics shall be registered.
+ assertEquals(5, metrics.metrics().size() -
additionalRegisteredMetricsSize);
+
+ // Remove 1 topic which has deprecated metrics as well.
+ subscriptionState.assignFromUser(Set.of(tp1, tp2));
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // For tp2, 14 metrics will be unregistered. 3 for partition lag, 3
for partition lead, 1 for
+ // preferred read replica and similarly 7 deprecated metrics.
+ assertEquals(-9, metrics.metrics().size() -
additionalRegisteredMetricsSize);
Review Comment:
This -9 is needlessly obscure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]