jsancio commented on code in PR #20422:
URL: https://github.com/apache/kafka/pull/20422#discussion_r2376264235
##########
metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java:
##########
@@ -189,6 +190,30 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}
+ @Test
+ public void testAvgIdleRatio() {
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ try (QuorumControllerMetrics metrics = new
QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
+ Gauge<Double> avgIdleRatio = (Gauge<Double>)
registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
+
+ // No idle time recorded yet
+ assertEquals(1.0, avgIdleRatio.value());
Review Comment:
When comparing double/float we should specify a delta parameter as double
are not precise.
##########
docs/upgrade.html:
##########
@@ -438,12 +437,12 @@ <h5><a id="upgrade_servers_400_notable"
href="#upgrade_servers_400_notable">Nota
<li>The <code>--whitelist</code> option was removed
from the <code>kafka-console-consumer</code> command line tool.
Please use <code>--include</code> instead.
</li>
- <li>Redirections from the old tools packages have been
removed:
- <code>kafka.admin.FeatureCommand</code>,
- <code>kafka.tools.ClusterTool</code>,
+ <li>Redirections from the old tools packages have been
removed:
+ <code>kafka.admin.FeatureCommand</code>,
+ <code>kafka.tools.ClusterTool</code>,
<code>kafka.tools.EndToEndLatency</code>,
- <code>kafka.tools.StateChangeLogMerger</code>,
- <code>kafka.tools.StreamsResetter</code>,
+ <code>kafka.tools.StateChangeLogMerger</code>,
+ <code>kafka.tools.StreamsResetter</code>,
Review Comment:
You have an extra space at the end of these lines.
##########
server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java:
##########
@@ -424,4 +424,50 @@ public void testInterruptedWithDeferredEvents() throws
Exception {
assertEquals(InterruptedException.class,
ieTrapper2.exception.get().getClass());
}
}
+
+ @Test
+ public void testIdleTimeCallback() throws Exception {
+ MockTime time = new MockTime();
+ AtomicLong lastIdleTimeMs = new AtomicLong(0);
+
+ try (KafkaEventQueue queue = new KafkaEventQueue(
+ time,
+ logContext,
+ "testIdleTimeCallback",
+ EventQueue.VoidEvent.INSTANCE,
+ lastIdleTimeMs::set)) {
+ time.sleep(2);
+ assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be
0ms");
+
+ // Test 1: Two events with a wait in between using FutureEvent
+ CompletableFuture<String> event1 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event1, () -> {
+ time.sleep(1);
+ return "event1-processed";
+ }));
+ assertEquals("event1-processed", event1.get());
+
+ long waitTime5Ms = 5;
+ time.sleep(waitTime5Ms);
+ CompletableFuture<String> event2 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event2, () -> {
+ time.sleep(1);
+ return "event2-processed";
+ }));
+ assertEquals("event2-processed", event2.get());
+ assertTrue(lastIdleTimeMs.get() >= waitTime5Ms,
+ "Idle time should be at least " + waitTime5Ms + "ms, was: " +
lastIdleTimeMs.get());
+
+ // Test 2: Deferred event
+ long waitTime2Ms = 2;
+ CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
+ queue.scheduleDeferred("deferred2",
+ __ -> OptionalLong.of(time.nanoseconds() +
TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
+ () -> deferredEvent2.complete(null));
+ time.sleep(waitTime2Ms);
+ deferredEvent2.get();
+ assertTrue(lastIdleTimeMs.get() >= waitTime2Ms,
+ "Idle time should be at least " + waitTime2Ms + "ms, was: " +
lastIdleTimeMs.get());
Review Comment:
Same comment here. Why isn't it exactly 2ms?
##########
metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java:
##########
@@ -189,6 +190,30 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}
+ @Test
+ public void testAvgIdleRatio() {
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ try (QuorumControllerMetrics metrics = new
QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
+ Gauge<Double> avgIdleRatio = (Gauge<Double>)
registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
+
+ // No idle time recorded yet
+ assertEquals(1.0, avgIdleRatio.value());
+
+ metrics.updateIdleTime(10);
Review Comment:
The `value()` of avg idle ration is .5 is because this recording is dropped.
Please document this.
##########
server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java:
##########
@@ -424,4 +424,50 @@ public void testInterruptedWithDeferredEvents() throws
Exception {
assertEquals(InterruptedException.class,
ieTrapper2.exception.get().getClass());
}
}
+
+ @Test
+ public void testIdleTimeCallback() throws Exception {
+ MockTime time = new MockTime();
+ AtomicLong lastIdleTimeMs = new AtomicLong(0);
+
+ try (KafkaEventQueue queue = new KafkaEventQueue(
+ time,
+ logContext,
+ "testIdleTimeCallback",
+ EventQueue.VoidEvent.INSTANCE,
+ lastIdleTimeMs::set)) {
+ time.sleep(2);
+ assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be
0ms");
+
+ // Test 1: Two events with a wait in between using FutureEvent
+ CompletableFuture<String> event1 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event1, () -> {
+ time.sleep(1);
+ return "event1-processed";
+ }));
+ assertEquals("event1-processed", event1.get());
+
+ long waitTime5Ms = 5;
+ time.sleep(waitTime5Ms);
+ CompletableFuture<String> event2 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event2, () -> {
+ time.sleep(1);
+ return "event2-processed";
+ }));
+ assertEquals("event2-processed", event2.get());
+ assertTrue(lastIdleTimeMs.get() >= waitTime5Ms,
+ "Idle time should be at least " + waitTime5Ms + "ms, was: " +
lastIdleTimeMs.get());
Review Comment:
Why isn't it exactly 5ms?
##########
docs/upgrade.html:
##########
@@ -507,7 +506,7 @@ <h5><a id="upgrade_servers_400_notable"
href="#upgrade_servers_400_notable">Nota
</li>
<li>The deprecated
<code>sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>,
String)</code> method has been removed from the Producer API.
</li>
- <li>The default <code>linger.ms</code> changed from 0
to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically
result in
+ <li>The default <code>linger.ms</code> changed from 0
to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically
result in
Review Comment:
Extra space.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]