junrao commented on code in PR #18232:
URL: https://github.com/apache/kafka/pull/18232#discussion_r1888991498
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java:
##########
@@ -162,16 +170,22 @@ void maybeUpdateAssignment(SubscriptionState
subscription) {
metrics.removeSensor(partitionRecordsLagMetricName(tp));
metrics.removeSensor(partitionRecordsLeadMetricName(tp));
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
+ // Remove deprecated metrics.
+
metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp)));
+
metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp)));
+
metrics.removeMetric(partitionPreferredReadReplicaMetricNameDeprecated(tp));
Review Comment:
partitionPreferredReadReplicaMetricNameDeprecated =>
deprecatedPartitionPreferredReadReplicaMetricName to be consistent with other
names?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -117,20 +119,40 @@ public void testBytesFetched() {
@Test
public void testBytesFetchedTopic() {
String topicName1 = TOPIC_NAME;
- String topicName2 = "another-topic";
- Map<String, String> tags1 = topicTags(topicName1);
- Map<String, String> tags2 = topicTags(topicName2);
+ String topicName2 = "another.topic";
+ Map<String, String> tags1 = Map.of("topic", topicName1);
+ Map<String, String> tags2 = Map.of("topic", topicName2);
+ Map<String, String> deprecatedTags = topicTags(topicName2);
+ int initialMetricsSize = metrics.metrics().size();
metricsManager.recordBytesFetched(topicName1, 2);
+ // 4 new metrics shall be registered.
+ assertEquals(4, metrics.metrics().size() - initialMetricsSize);
metricsManager.recordBytesFetched(topicName2, 1);
+ // Another 8 metrics gets registered as deprecated metrics should be
reported for topicName2.
Review Comment:
gets => get
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java:
##########
@@ -181,6 +195,67 @@ void maybeUpdateAssignment(SubscriptionState subscription)
{
}
}
+ @Deprecated // To be removed in Kafka 5.0 release.
+ private void maybeRecordDeprecatedBytesFetched(String name, String topic,
int bytes) {
Review Comment:
It would be useful to follow the approach used in
https://github.com/apache/kafka/pull/11302/files to add "deprecated" in the
description of the metric name and update the ops doc about the deprecation.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java:
##########
@@ -145,46 +167,209 @@ public void testRecordsFetched() {
@Test
public void testRecordsFetchedTopic() {
String topicName1 = TOPIC_NAME;
- String topicName2 = "another-topic";
- Map<String, String> tags1 = topicTags(topicName1);
- Map<String, String> tags2 = topicTags(topicName2);
+ String topicName2 = "another.topic";
+ Map<String, String> tags1 = Map.of("topic", topicName1);
+ Map<String, String> tags2 = Map.of("topic", topicName2);
+ Map<String, String> deprecatedTags = topicTags(topicName2);
+ int initialMetricsSize = metrics.metrics().size();
metricsManager.recordRecordsFetched(topicName1, 2);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
metricsManager.recordRecordsFetched(topicName2, 1);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for topicName2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
time.sleep(metrics.config().timeWindowMs() + 1);
metricsManager.recordRecordsFetched(topicName1, 10);
metricsManager.recordRecordsFetched(topicName2, 5);
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for topicName1.
assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags1), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
tags1) > 0);
+ assertEquals(12,
metricValue(metricsRegistry.topicRecordsConsumedTotal, tags1), EPSILON);
+ // Validate metrics for topicName2.
assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags2), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
tags2) > 0);
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
deprecatedTags), EPSILON);
+ assertTrue(metricValue(metricsRegistry.topicRecordsConsumedRate,
deprecatedTags) > 0);
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal,
deprecatedTags), EPSILON);
}
@Test
public void testPartitionLag() {
- Map<String, String> tags = topicPartitionTags(TP);
- metricsManager.recordPartitionLag(TP, 14);
- metricsManager.recordPartitionLag(TP, 8);
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLag(tp1, 14);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLag(tp1, 8);
time.sleep(metrics.config().timeWindowMs() + 1);
- metricsManager.recordPartitionLag(TP, 5);
+ metricsManager.recordPartitionLag(tp1, 5);
+ // Subsequent calls should not register new metrics.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp1.
assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
- assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags), EPSILON);
- assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags), EPSILON);
- assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags), EPSILON);
+ assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags1), EPSILON);
+ assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags1), EPSILON);
+ assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags1), EPSILON);
+
+ metricsManager.recordPartitionLag(tp2, 7);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ metricsManager.recordPartitionLag(tp2, 3);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLag(tp2, 2);
+
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp2.
+ assertEquals(7, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+ assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag,
tags2), EPSILON);
+ assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax,
tags2), EPSILON);
+ assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag,
deprecatedTags), EPSILON);
+ assertEquals(7, metricValue(metricsRegistry.partitionRecordsLagMax,
deprecatedTags), EPSILON);
+ assertEquals(4, metricValue(metricsRegistry.partitionRecordsLagAvg,
deprecatedTags), EPSILON);
}
@Test
public void testPartitionLead() {
- Map<String, String> tags = topicPartitionTags(TP);
- metricsManager.recordPartitionLead(TP, 15);
- metricsManager.recordPartitionLead(TP, 11);
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLead(tp1, 15);
+ // 3 new metrics shall be registered.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLead(tp1, 11);
time.sleep(metrics.config().timeWindowMs() + 1);
- metricsManager.recordPartitionLead(TP, 13);
+ metricsManager.recordPartitionLead(tp1, 13);
+ // Subsequent calls should not register new metrics.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp1.
assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
- assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags), EPSILON);
- assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags), EPSILON);
- assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags1), EPSILON);
+ assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags1), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags1), EPSILON);
+
+ metricsManager.recordPartitionLead(tp2, 18);
+ // Another 6 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+
+ metricsManager.recordPartitionLead(tp2, 12);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLead(tp2, 15);
+
+ // Subsequent calls should not register new metrics.
+ assertEquals(9, metrics.metrics().size() - initialMetricsSize);
+ // Validate metrics for tp2.
+ assertEquals(12, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead,
tags2), EPSILON);
+ assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags2), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags2), EPSILON);
+ // Validate metrics for deprecated topic.
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLead,
deprecatedTags), EPSILON);
+ assertEquals(12, metricValue(metricsRegistry.partitionRecordsLeadMin,
deprecatedTags), EPSILON);
+ assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg,
deprecatedTags), EPSILON);
+ }
+
+ @Test
+ public void testMaybeUpdateAssignment() {
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+ TopicPartition tp3 = new TopicPartition("another.topic", 1);
+ int initialMetricsSize = metrics.metrics().size();
+
+ SubscriptionState subscriptionState = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ subscriptionState.assignFromUser(Set.of(tp1));
+
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // 1 new metrics shall be registered.
+ assertEquals(1, metrics.metrics().size() - initialMetricsSize);
+
+ subscriptionState.assignFromUser(Set.of(tp1, tp2));
+ subscriptionState.updatePreferredReadReplica(tp2, 1, () -> 0L);
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Another 2 metrics gets registered as deprecated metrics should be
reported for tp2.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ Map<String, String> tags1 = Map.of("topic", tp1.topic(), "partition",
String.valueOf(tp1.partition()));
+ Map<String, String> tags2 = Map.of("topic", tp2.topic(), "partition",
String.valueOf(tp2.partition()));
+ Map<String, String> deprecatedTags = topicPartitionTags(tp2);
+ // Validate preferred read replica metrics.
+ assertEquals(-1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags1),
EPSILON);
+ assertEquals(1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, tags2),
EPSILON);
+ assertEquals(1,
readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica,
deprecatedTags), EPSILON);
+
+ // Remove tp2 from subscription set.
+ subscriptionState.assignFromUser(Set.of(tp1, tp3));
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Metrics count shall remain same as tp2 should be removed and tp3
gets added.
+ assertEquals(3, metrics.metrics().size() - initialMetricsSize);
+
+ // Remove all partitions.
+ subscriptionState.assignFromUser(Set.of());
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+ // Metrics count shall be same as initial count as all new metrics
shall be removed.
+ assertEquals(initialMetricsSize, metrics.metrics().size());
+ }
+
+ @Test
+ public void testMaybeUpdateAssignmentWithAdditionalRegisteredMetrics() {
+ TopicPartition tp1 = new TopicPartition(TOPIC_NAME, 0);
+ TopicPartition tp2 = new TopicPartition("another.topic", 0);
+ TopicPartition tp3 = new TopicPartition("another.topic", 1);
+
+ int initialMetricsSize = metrics.metrics().size();
+
+ metricsManager.recordPartitionLag(tp1, 14);
+ metricsManager.recordPartitionLead(tp1, 11);
+ metricsManager.recordPartitionLag(tp2, 5);
+ metricsManager.recordPartitionLead(tp2, 1);
+ metricsManager.recordPartitionLag(tp3, 4);
+ metricsManager.recordPartitionLead(tp3, 2);
+
+ int additionalRegisteredMetricsSize = metrics.metrics().size();
+
+ SubscriptionState subscriptionState = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ subscriptionState.assignFromUser(Set.of(tp1, tp2, tp3));
+ metricsManager.maybeUpdateAssignment(subscriptionState);
+
+ // 5 new metrics shall be registered.
+ assertEquals(5, metrics.metrics().size() -
additionalRegisteredMetricsSize);
+
+ // Remove 1 topic which has deprecated metrics as well.
Review Comment:
1 topic => 1 partition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]