jsancio commented on code in PR #20422:
URL: https://github.com/apache/kafka/pull/20422#discussion_r2349311699


##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -278,21 +280,24 @@ private void handleEvents() {
                         remove(toRun);
                         continue;
                     }
-                    if (awaitNs == Long.MAX_VALUE) {
-                        try {
+
+                    long startIdleNs = time.nanoseconds();
+                    try {
+                        if (awaitNs == Long.MAX_VALUE) {
                             cond.await();
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a new 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
-                        }
-                    } else {
-                        try {
+                        } else {
                             cond.awaitNanos(awaitNs);
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a deferred 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
+                        }
+                    } catch (InterruptedException e) {
+
+                        log.warn("Interrupted while waiting for a {} event. " +
+                                "Shutting down event queue", (awaitNs == 
Long.MAX_VALUE) ? "new" : "deferred");
+                        interrupted = true;
+                    } finally {
+                        if (idleTimeCallback != null) {
+                            long idleNs = Math.max(time.nanoseconds() - 
startIdleNs, 0);
+                            long idleMs = 
TimeUnit.NANOSECONDS.toMillis(idleNs);

Review Comment:
   Please use `time.milliseconds` since you are converting to milliseconds 
anyways.



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -440,6 +445,12 @@ int size() {
      */
     private boolean interrupted;
 
+    /**
+     * Optional callback for queue idle time tracking.
+     */
+    private Consumer<Long> idleTimeCallback;

Review Comment:
   This needs to be final. Please pass the idle time callback through the 
constructor.



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -278,21 +280,24 @@ private void handleEvents() {
                         remove(toRun);
                         continue;
                     }
-                    if (awaitNs == Long.MAX_VALUE) {
-                        try {
+
+                    long startIdleNs = time.nanoseconds();
+                    try {
+                        if (awaitNs == Long.MAX_VALUE) {
                             cond.await();
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a new 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
-                        }
-                    } else {
-                        try {
+                        } else {
                             cond.awaitNanos(awaitNs);
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a deferred 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
+                        }
+                    } catch (InterruptedException e) {
+
+                        log.warn("Interrupted while waiting for a {} event. " +
+                                "Shutting down event queue", (awaitNs == 
Long.MAX_VALUE) ? "new" : "deferred");
+                        interrupted = true;
+                    } finally {
+                        if (idleTimeCallback != null) {

Review Comment:
   Instead of checking for null, let's have a noop consumer if the user don't 
specify a consumer callback.



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -278,21 +280,24 @@ private void handleEvents() {
                         remove(toRun);
                         continue;
                     }
-                    if (awaitNs == Long.MAX_VALUE) {
-                        try {
+
+                    long startIdleNs = time.nanoseconds();
+                    try {
+                        if (awaitNs == Long.MAX_VALUE) {
                             cond.await();
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a new 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
-                        }
-                    } else {
-                        try {
+                        } else {
                             cond.awaitNanos(awaitNs);
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a deferred 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
+                        }
+                    } catch (InterruptedException e) {
+
+                        log.warn("Interrupted while waiting for a {} event. " +
+                                "Shutting down event queue", (awaitNs == 
Long.MAX_VALUE) ? "new" : "deferred");
+                        interrupted = true;
+                    } finally {
+                        if (idleTimeCallback != null) {
+                            long idleNs = Math.max(time.nanoseconds() - 
startIdleNs, 0);
+                            long idleMs = 
TimeUnit.NANOSECONDS.toMillis(idleNs);
+                            idleTimeCallback.accept(idleMs);

Review Comment:
   Can we add a test in KafkaEventQueueTest for this feature?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to