kirktrue commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1800283720


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1740,14 +1742,19 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     *
+     * Visible for testing.
      */
-    private boolean processBackgroundEvents() {
+    boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
         LinkedList<BackgroundEvent> events = new LinkedList<>();
         backgroundEventQueue.drainTo(events);
+        
kafkaConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
 
         for (BackgroundEvent event : events) {
+            
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.addedToQueueMs());
+            long startMs = time.milliseconds();

Review Comment:
   Does the metric determine how long it takes to process the background event 
queue, or how long it takes to process background events? If it's the latter, 
we want to update the metric inside each loop, but if it's the former we should 
update once outside the loop.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -85,6 +88,17 @@ public NetworkClientDelegate(
         this.unsentRequests = new ArrayDeque<>();
         this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.kafkaConsumerMetrics = kafkaConsumerMetrics;
+    }
+
+    public NetworkClientDelegate(
+            final Time time,
+            final ConsumerConfig config,
+            final LogContext logContext,
+            final KafkaClient client,
+            final Metadata metadata,
+            final BackgroundEventHandler backgroundEventHandler) {
+        this(time, config, logContext, client, metadata, 
backgroundEventHandler, Optional.empty());

Review Comment:
   Is it possible to remove this constructor? Can't callers invoke the existing 
constructor with `Optional.empty()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -73,7 +90,9 @@ public ApplicationEventHandler(final LogContext logContext,
      */
     public void add(final ApplicationEvent event) {
         Objects.requireNonNull(event, "ApplicationEvent provided to add must 
be non-null");
+        event.setAddedToQueueMs(System.currentTimeMillis());

Review Comment:
   This needs to get the current time in milliseconds from the `Time` object 
that was passed in to the constructor.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue)
      */
     public void add(BackgroundEvent event) {
         Objects.requireNonNull(event, "BackgroundEvent provided to add must be 
non-null");
+        event.setAddedToQueueMs(System.currentTimeMillis());

Review Comment:
   Same comment here: we need to update the constructor to provide a `Time` 
object and then use that here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -139,6 +155,9 @@ void runOnce() {
         processApplicationEvents();
 
         final long currentTimeMs = time.milliseconds();
+        final long timeSinceLastPollMs = lastPollTimeMs != 0L ? currentTimeMs 
- lastPollTimeMs : currentTimeMs;

Review Comment:
   In the first invocation of `runOnce()`, `timeSinceLastPollMs` will be 
something like `1728954137284`. Is that correct?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -317,6 +335,14 @@ Timer timer() {
             return timer;
         }
 
+        void setAddedToQueueMs(final long addedToQueueMs) {
+            this.addedToQueueMs = addedToQueueMs;
+        }
+
+        long addedToQueueMs() {
+            return addedToQueueMs;
+        }
+

Review Comment:
   I know it's a bit nit-picky, but can we change it to:
   ```suggestion
           void setEnqueuedMs(final long enqueuedMs) {
               this.enqueuedMs = enqueuedMs;
           }
   
           long enqueuedMs() {
               return enqueuedMs;
           }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -84,6 +88,18 @@ public ConsumerNetworkThread(LogContext logContext,
         this.networkClientDelegateSupplier = networkClientDelegateSupplier;
         this.requestManagersSupplier = requestManagersSupplier;
         this.running = true;
+        this.kafkaConsumerMetrics = kafkaConsumerMetrics;
+    }
+
+    public ConsumerNetworkThread(LogContext logContext,
+                                 Time time,
+                                 BlockingQueue<ApplicationEvent> 
applicationEventQueue,
+                                 CompletableEventReaper applicationEventReaper,
+                                 Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier,
+                                 Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier,
+                                 Supplier<RequestManagers> 
requestManagersSupplier) {
+        this(logContext, time, applicationEventQueue, applicationEventReaper, 
applicationEventProcessorSupplier,
+            networkClientDelegateSupplier, requestManagersSupplier, 
Optional.empty());

Review Comment:
   Is it possible to remove this constructor? Can't we call the existing 
constructor with `Optional.empty()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -30,9 +32,16 @@
 public class BackgroundEventHandler {
 
     private final Queue<BackgroundEvent> backgroundEventQueue;
+    private final Optional<KafkaConsumerMetrics> kafkaConsumerMetrics;
 
-    public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue) {
+    public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue,
+                                  final Optional<KafkaConsumerMetrics> 
kafkaConsumerMetrics) {
         this.backgroundEventQueue = backgroundEventQueue;
+        this.kafkaConsumerMetrics = kafkaConsumerMetrics;
+    }
+
+    public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue) {
+        this(backgroundEventQueue, Optional.empty());

Review Comment:
   Same with the other changes. I'd rather make the callers pass in 
`Optional.empty()`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java:
##########
@@ -51,6 +52,14 @@ public Uuid id() {
         return id;
     }
 
+    public void setAddedToQueueMs(long addedToQueueMs) {
+        this.addedToQueueMs = addedToQueueMs;
+    }
+
+    public long addedToQueueMs() {
+        return addedToQueueMs;
+    }

Review Comment:
   ```suggestion
       public void setEnqueuedMs(long enqueuedMs) {
           this. enqueuedMs = enqueuedMs;
       }
   
       public long enqueuedMs() {
           return enqueuedMs;
       }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -61,6 +62,14 @@ public Uuid id() {
         return id;
     }
 
+    public void setAddedToQueueMs(long addedToQueueMs) {
+        this.addedToQueueMs = addedToQueueMs;
+    }
+
+    public long addedToQueueMs() {
+        return addedToQueueMs;
+    }
+

Review Comment:
   ```suggestion
       public void setEnqueuedMs(long enqueuedMs) {
           enqueuedMs = enqueuedMs;
       }
   
       public long enqueuedMs() {
           return enqueuedMs;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to