lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1849018100


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -173,6 +181,7 @@ private void processApplicationEvents() {
                 log.warn("Error processing event {}", t.getMessage(), t);
             }
         }
+        kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - 
startMs));

Review Comment:
   we probably don't want to record this if there were no events right? Maybe 
just an early return after drainEvents if there are none?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -173,6 +178,7 @@ private void trySend(final long currentTimeMs) {
             unsent.timer.update(currentTimeMs);
             if (unsent.timer.isExpired()) {
                 iterator.remove();
+                kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - 
unsent.enqueuedMs()));

Review Comment:
   shouldn't we use `currentTimeMs` instead of `this.time.milliseconds()`? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -73,10 +84,19 @@ public ApplicationEventHandler(final LogContext logContext,
      */
     public void add(final ApplicationEvent event) {
         Objects.requireNonNull(event, "ApplicationEvent provided to add must 
be non-null");
+        event.setEnqueuedMs(time.milliseconds());
         applicationEventQueue.add(event);
+        kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueSize(applicationEventQueue.size()));
         wakeupNetworkThread();
     }
 
+    public List<ApplicationEvent> drainEvents() {

Review Comment:
   could you add a javadoc for this pls



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -183,6 +189,7 @@ private void trySend(final long currentTimeMs) {
                 continue;
             }
             iterator.remove();
+            kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - 
unsent.enqueuedMs()));

Review Comment:
   `currentTimeMs`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -162,15 +171,20 @@ void runOnce() {
     private void processApplicationEvents() {
         LinkedList<ApplicationEvent> events = new LinkedList<>();
         applicationEventQueue.drainTo(events);
+        kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueSize(0));

Review Comment:
   Oh good point, I forgot about that dependency! (feels kind of unexpected 
actually). Given that structure we shouldn't reference AppEventHandler in the 
ConsumerNetworkThread because we would end up with a circular dependency. 
   
   Sorry for the extra work, but I would suggest we revert this back to your 
initial change, without having a drainEvents, and we rethink this class 
structure in a separate jira, to see if it would make sense to decouple the 
AppEventHandler from the network thread (and if so then we could properly add a 
drainEvents, without circular deps). Makes sense? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1913,6 +1923,7 @@ private boolean processBackgroundEvents() {
                     log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
             }
         }
+        
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);

Review Comment:
   we probably don't want to record this if there were no events right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -144,6 +148,7 @@ public void poll(final long timeoutMs, final long 
currentTimeMs) {
         this.client.poll(pollTimeoutMs, currentTimeMs);
         maybePropagateMetadataError();
         checkDisconnects(currentTimeMs);
+        kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueSize(unsentRequests.size(), 
this.time.milliseconds()));

Review Comment:
   should we use `currentTimeMs` instead of `this.time.milliseconds`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -317,6 +328,14 @@ Timer timer() {
             return timer;
         }
 
+        void setEnqueuedMs(final long enqueuedMs) {

Review Comment:
   could add a java doc for this, the getter and var. Mainly to clarify its the 
time when it was added to the queue (not a duration/time it's been on the 
queue, which is another concept we are working with, that's why I think it's 
worth clarifying)....we could also consider `enqueueTimeMs` as name. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -267,7 +272,9 @@ public void addAll(final List<UnsentRequest> requests) {
     public void add(final UnsentRequest r) {
         Objects.requireNonNull(r);
         r.setTimer(this.time, this.requestTimeoutMs);
+        r.setEnqueuedMs(this.time.milliseconds());
         unsentRequests.add(r);
+        kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueSize(unsentRequests.size()));

Review Comment:
   Makes sense to record the unsent event queue size at the end of a poll 
iteration, after events were added and removed/sent, that's truly what could 
help spot issues (too many requests being left "unsent" on each run). Then it 
actually makes me wonder about the value of also calling it on add, would it be 
useful to see the number of events added on each run? or just seeing how many 
where left unsent is all that matters?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -210,6 +217,7 @@ protected void checkDisconnects(final long currentTimeMs) {
             UnsentRequest u = iter.next();
             if (u.node.isPresent() && client.connectionFailed(u.node.get())) {
                 iter.remove();
+                kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueTime(this.time.milliseconds() - 
u.enqueuedMs()));

Review Comment:
   `currentTimeMs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to