apalan60 commented on code in PR #20565:
URL: https://github.com/apache/kafka/pull/20565#discussion_r2402686809


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java:
##########
@@ -24,40 +24,278 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 
 class ConsumerRebalanceMetricsManagerTest {
 
-    private final Time time = new MockTime();
-    private final Metrics metrics = new Metrics(time);
+    private Time time;
+    private Metrics metrics;
+    private SubscriptionState subscriptionState;
+    private ConsumerRebalanceMetricsManager metricsManager;
+
+    @BeforeEach
+    public void setUp() {
+        time = new MockTime();
+        metrics = new Metrics(time);
+        subscriptionState = new SubscriptionState(mock(LogContext.class), 
AutoOffsetResetStrategy.EARLIEST);
+        metricsManager = new ConsumerRebalanceMetricsManager(metrics, 
subscriptionState);
+    }
 
     @Test
     public void testAssignedPartitionCountMetric() {
-        SubscriptionState subscriptionState = new 
SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
-        ConsumerRebalanceMetricsManager consumerRebalanceMetricsManager = new 
ConsumerRebalanceMetricsManager(metrics, subscriptionState);
-
-        
assertNotNull(metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount),
 "Metric assigned-partitions has not been registered as expected");
+        assertNotNull(metrics.metric(metricsManager.assignedPartitionsCount), 
"Metric assigned-partitions has not been registered as expected");
 
         // Check for manually assigned partitions
         subscriptionState.assignFromUser(Set.of(new TopicPartition("topic", 
0), new TopicPartition("topic", 1)));
-        assertEquals(2.0d, 
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+        assertEquals(2.0d, 
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
         subscriptionState.assignFromUser(Set.of());
-        assertEquals(0.0d, 
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+        assertEquals(0.0d, 
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
 
         subscriptionState.unsubscribe();
-        assertEquals(0.0d, 
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+        assertEquals(0.0d, 
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
 
         // Check for automatically assigned partitions
         subscriptionState.subscribe(Set.of("topic"), Optional.empty());
         subscriptionState.assignFromSubscribed(Set.of(new 
TopicPartition("topic", 0)));
-        assertEquals(1.0d, 
metrics.metric(consumerRebalanceMetricsManager.assignedPartitionsCount).metricValue());
+        assertEquals(1.0d, 
metrics.metric(metricsManager.assignedPartitionsCount).metricValue());
+    }
+
+    @Test
+    public void testRebalanceTimingMetrics() {
+
+        // Verify timing metrics are registered
+        assertNotNull(metrics.metric(metricsManager.rebalanceLatencyAvg));
+        assertNotNull(metrics.metric(metricsManager.rebalanceLatencyMax));
+        assertNotNull(metrics.metric(metricsManager.rebalanceLatencyTotal));
+        assertNotNull(metrics.metric(metricsManager.rebalanceTotal));
+
+        // Record first rebalance (10ms duration)
+        metricsManager.recordRebalanceStarted(time.milliseconds());
+        time.sleep(10);
+        metricsManager.recordRebalanceEnded(time.milliseconds());
+
+        // Verify metrics after first rebalance
+        assertEquals(10.0d, 
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue());
+        assertEquals(10.0d, 
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue());
+        assertEquals(10.0d, 
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue());
+        assertEquals(1.0d, 
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+        // Record second rebalance (30ms duration)
+        metricsManager.recordRebalanceStarted(time.milliseconds());
+        time.sleep(30);
+        metricsManager.recordRebalanceEnded(time.milliseconds());
+
+        // Verify metrics after second rebalance
+        assertEquals(20.0d, 
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+                "Average latency should be (10 + 30) / 2 = 20ms");
+        assertEquals(30.0d, 
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+                "Max latency should be max(10, 30) = 30ms");
+        assertEquals(40.0d, 
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+                "Total latency should be 10 + 30 = 40ms");
+        assertEquals(2.0d, 
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+
+        // Record third rebalance (50ms duration)
+        metricsManager.recordRebalanceStarted(time.milliseconds());
+        time.sleep(50);
+        metricsManager.recordRebalanceEnded(time.milliseconds());
+
+        // Verify metrics after third rebalance
+        assertEquals(30.0d, 
metrics.metric(metricsManager.rebalanceLatencyAvg).metricValue(),
+                "Average latency should be (10 + 30 + 50) / 3 = 30ms");
+        assertEquals(50.0d, 
metrics.metric(metricsManager.rebalanceLatencyMax).metricValue(),
+                "Max latency should be max(10, 30, 50) = 50ms");
+        assertEquals(90.0d, 
metrics.metric(metricsManager.rebalanceLatencyTotal).metricValue(),
+                "Total latency should be 10 + 30 + 50 = 90ms");
+        assertEquals(3.0d, 
metrics.metric(metricsManager.rebalanceTotal).metricValue());
+    }
+
+    @Test
+    public void testRebalanceRateMetric() {
+
+        // Verify rate metric is registered
+        assertNotNull(metrics.metric(metricsManager.rebalanceRatePerHour));
+
+        // Record 3 rebalances within 30ms total (3 x 10ms)
+        for (int i = 0; i < 3; i++) {
+            metricsManager.recordRebalanceStarted(time.milliseconds());
+            time.sleep(10);
+            metricsManager.recordRebalanceEnded(time.milliseconds());
+        }
+
+        double ratePerHour = (Double) 
metrics.metric(metricsManager.rebalanceRatePerHour).metricValue();
+        assertEquals(360.0d, ratePerHour, 1.0, 

Review Comment:
   It seems that some expected results depend on the default values of 
`MetricConfig` (e.g., `metrics.sample.window.ms`, `metrics.num.samples`). That 
might make the test a bit fragile if those defaults ever change.
   
   I wonder if we could make a small adjustment here to make it more resilient. 
For example:
   
   * Pass a fixed `MetricConfig` in `setup()`. This could still use the same 
values as the current defaults, so the test behavior remains unchanged.
   * Keep the default `MetricConfig` as a field and compute the expected value 
dynamically from its window size, instead of hard-coding the result.
   
   Do you think an adjustment along these lines would make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to