jsancio commented on code in PR #20422: URL: https://github.com/apache/kafka/pull/20422#discussion_r2349311699
########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -278,21 +280,24 @@ private void handleEvents() { remove(toRun); continue; } - if (awaitNs == Long.MAX_VALUE) { - try { + + long startIdleNs = time.nanoseconds(); + try { + if (awaitNs == Long.MAX_VALUE) { cond.await(); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for a new event. " + - "Shutting down event queue"); - interrupted = true; - } - } else { - try { + } else { cond.awaitNanos(awaitNs); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for a deferred event. " + - "Shutting down event queue"); - interrupted = true; + } + } catch (InterruptedException e) { + + log.warn("Interrupted while waiting for a {} event. " + + "Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred"); + interrupted = true; + } finally { + if (idleTimeCallback != null) { + long idleNs = Math.max(time.nanoseconds() - startIdleNs, 0); + long idleMs = TimeUnit.NANOSECONDS.toMillis(idleNs); Review Comment: Please use `time.milliseconds` since you are converting to milliseconds anyways. ########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -440,6 +445,12 @@ int size() { */ private boolean interrupted; + /** + * Optional callback for queue idle time tracking. + */ + private Consumer<Long> idleTimeCallback; Review Comment: This needs to be final. Please pass the idle time callback through the constructor. ########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -278,21 +280,24 @@ private void handleEvents() { remove(toRun); continue; } - if (awaitNs == Long.MAX_VALUE) { - try { + + long startIdleNs = time.nanoseconds(); + try { + if (awaitNs == Long.MAX_VALUE) { cond.await(); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for a new event. " + - "Shutting down event queue"); - interrupted = true; - } - } else { - try { + } else { cond.awaitNanos(awaitNs); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for a deferred event. " + - "Shutting down event queue"); - interrupted = true; + } + } catch (InterruptedException e) { + + log.warn("Interrupted while waiting for a {} event. " + + "Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred"); + interrupted = true; + } finally { + if (idleTimeCallback != null) { Review Comment: Instead of checking for null, let's have a noop consumer if the user don't specify a consumer callback. ########## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ########## @@ -278,21 +280,24 @@ private void handleEvents() { remove(toRun); continue; } - if (awaitNs == Long.MAX_VALUE) { - try { + + long startIdleNs = time.nanoseconds(); + try { + if (awaitNs == Long.MAX_VALUE) { cond.await(); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for a new event. " + - "Shutting down event queue"); - interrupted = true; - } - } else { - try { + } else { cond.awaitNanos(awaitNs); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for a deferred event. " + - "Shutting down event queue"); - interrupted = true; + } + } catch (InterruptedException e) { + + log.warn("Interrupted while waiting for a {} event. " + + "Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred"); + interrupted = true; + } finally { + if (idleTimeCallback != null) { + long idleNs = Math.max(time.nanoseconds() - startIdleNs, 0); + long idleMs = TimeUnit.NANOSECONDS.toMillis(idleNs); + idleTimeCallback.accept(idleMs); Review Comment: Can we add a test in KafkaEventQueueTest for this feature? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org