iit2009060 commented on code in PR #20565:
URL: https://github.com/apache/kafka/pull/20565#discussion_r2403757513
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java:
##########
@@ -24,40 +24,278 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
class ConsumerRebalanceMetricsManagerTest {
- private final Time time = new MockTime();
- private final Metrics metrics = new Metrics(time);
+ private Time time;
+ private Metrics metrics;
+ private SubscriptionState subscriptionState;
+ private ConsumerRebalanceMetricsManager metricsManager;
+
+ @BeforeEach
+ public void setUp() {
+ time = new MockTime();
+ metrics = new Metrics(time);
+ subscriptionState = new SubscriptionState(mock(LogContext.class),
AutoOffsetResetStrategy.EARLIEST);
+ metricsManager = new ConsumerRebalanceMetricsManager(metrics,
subscriptionState);
+ }
@Test
public void testAssignedPartitionCountMetric() {
- SubscriptionState subscriptionState = new
SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
- ConsumerRebalanceMetricsManager consumerRebalanceMetricsManager = new
ConsumerRebalanceMetricsManager(metrics, subscriptionState);
-
-
assertNotNull(metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount),
"Metric assigned-partitions has not been registered as expected");
+ assertNotNull(metrics.metric(metricsManager.assignedPartitionsCount),
"Metric assigned-partitions has not been registered as expected");
// Check for manually assigned partitions
subscriptionState.assignFromUser(Set.of(new TopicPartition("topic",
0), new TopicPartition("topic", 1)));
- assertEquals(2.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(2.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
subscriptionState.assignFromUser(Set.of());
- assertEquals(0.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(0.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
subscriptionState.unsubscribe();
- assertEquals(0.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(0.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
// Check for automatically assigned partitions
subscriptionState.subscribe(Set.of("topic"), Optional.empty());
subscriptionState.assignFromSubscribed(Set.of(new
TopicPartition("topic", 0)));
- assertEquals(1.0d,
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+ assertEquals(1.0d,
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
+ }
+
+ @Test
+ public void testRebalanceTimingMetrics() {
+
+ // Verify timing metrics are registered
+ assertNotNull(metrics.metric(metricsManager.rebalanceLatencyAvg));
+ assertNotNull(metrics.metric(metricsManager.rebalanceLatencyMax));
+ assertNotNull(metrics.metric(metricsManager.rebalanceLatencyTotal));
+ assertNotNull(metrics.metric(metricsManager.rebalanceTotal));
+
+ // Record first rebalance (10ms duration)
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ // Verify metrics after first rebalance
+ assertEquals(10.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue());
+ assertEquals(10.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue());
+ assertEquals(10.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue());
+ assertEquals(1.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+ // Record second rebalance (30ms duration)
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(30);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ // Verify metrics after second rebalance
+ assertEquals(20.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+ "Average latency should be (10 + 30) / 2 = 20ms");
+ assertEquals(30.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+ "Max latency should be max(10, 30) = 30ms");
+ assertEquals(40.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+ "Total latency should be 10 + 30 = 40ms");
+ assertEquals(2.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+ // Record third rebalance (50ms duration)
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(50);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+
+ // Verify metrics after third rebalance
+ assertEquals(30.0d,
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+ "Average latency should be (10 + 30 + 50) / 3 = 30ms");
+ assertEquals(50.0d,
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+ "Max latency should be max(10, 30, 50) = 50ms");
+ assertEquals(90.0d,
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+ "Total latency should be 10 + 30 + 50 = 90ms");
+ assertEquals(3.0d,
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+ }
+
+ @Test
+ public void testRebalanceRateMetric() {
+
+ // Verify rate metric is registered
+ assertNotNull(metrics.metric(metricsManager.rebalanceRatePerHour));
+
+ // Record 3 rebalances within 30ms total (3 x 10ms)
+ for (int i = 0; i < 3; i++) {
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ time.sleep(10);
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ }
+
+ double ratePerHour = (Double)
metrics.metric(metricsManager.rebalanceRatePerHour).metricValue();
+ assertEquals(360.0d, ratePerHour, 1.0,
Review Comment:
Yes totally makes sense. in future it can fail if these default values
changed. Its better to have Metric config of our own.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]