AndrewJSchofield commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1854114243


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -42,26 +44,32 @@
 public class ApplicationEventHandler implements Closeable {
 
     private final Logger log;
+    private final Time time;
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;

Review Comment:
   I wonder if we could actually make a class for the application event queue, 
rather than using `BlockingQueue` directly. In this way, we could do things 
like incorporate the queue size metric into the queue, rather than having 
update the metrics in the caller.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements 
AutoCloseable {

Review Comment:
   These metrics are really to do with measuring the processing of events sent 
between the application and background threads. AsyncKafkaConsumer and 
ShareConsumerImpl both use the same architecture and the same metrics would 
apply equally well to both. So, I suggest:
   
   * Renaming this to `AsyncConsumerMetrics`
   * Do not inherit from `KafkaConsumerMetrics`
   * Supply an instance of this class when constructing the 
`NetworkClientDelegate` in `ShareConsumerImpl`. This should not be 
`Optional.empty()`. In fact, I don't think it should be optional at all.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -61,6 +62,14 @@ public Uuid id() {
         return id;
     }
 
+    public void setEnqueuedMs(long enqueuedMs) {
+        this.enqueuedMs = enqueuedMs;
+    }

Review Comment:
   I take the point about `hashCode()` or `equals()` but we probably should be 
able to see the enqueued time in the string representation of the events.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -60,21 +61,24 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
     private final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier;
     private final Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier;
     private final Supplier<RequestManagers> requestManagersSupplier;
+    private final Optional<KafkaAsyncConsumerMetrics> 
kafkaAsyncConsumerMetrics;
     private ApplicationEventProcessor applicationEventProcessor;
     private NetworkClientDelegate networkClientDelegate;
     private RequestManagers requestManagers;
     private volatile boolean running;
     private final IdempotentCloser closer = new IdempotentCloser();
     private volatile Duration closeTimeout = 
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
     private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
+    private long lastPollTimeMs = 0L;
 
     public ConsumerNetworkThread(LogContext logContext,
                                  Time time,
                                  BlockingQueue<ApplicationEvent> 
applicationEventQueue,
                                  CompletableEventReaper applicationEventReaper,
                                  Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier,
                                  Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier,
-                                 Supplier<RequestManagers> 
requestManagersSupplier) {
+                                 Supplier<RequestManagers> 
requestManagersSupplier,
+                                 Optional<KafkaAsyncConsumerMetrics> 
kafkaAsyncConsumerMetrics) {

Review Comment:
   As mentioned in another comment, this should not be optional. It's almost 
general-purpose enough that it can be used for all users of 
ConsumerNetworkThread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue)
      */
     public void add(BackgroundEvent event) {
         Objects.requireNonNull(event, "BackgroundEvent provided to add must be 
non-null");
+        event.setEnqueuedMs(time.milliseconds());
         backgroundEventQueue.add(event);
+        kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()));

Review Comment:
   Yes, I agree.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     *
+     * Visible for testing.
      */
-    private boolean processBackgroundEvents() {
+    boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
-        LinkedList<BackgroundEvent> events = new LinkedList<>();
-        backgroundEventQueue.drainTo(events);
-
-        for (BackgroundEvent event : events) {
-            try {
-                if (event instanceof CompletableEvent)
-                    backgroundEventReaper.add((CompletableEvent<?>) event);
-
-                backgroundEventProcessor.process(event);
-            } catch (Throwable t) {
-                KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
-                if (!firstError.compareAndSet(null, e))
-                    log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+        List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
+        if (!events.isEmpty()) {
+            long startMs = time.milliseconds();
+            for (BackgroundEvent event : events) {
+                
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+                try {
+                    if (event instanceof CompletableEvent)
+                        backgroundEventReaper.add((CompletableEvent<?>) event);
+
+                    backgroundEventProcessor.process(event);
+                } catch (Throwable t) {
+                    KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+                    if (!firstError.compareAndSet(null, e))
+                        log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
+                }
             }
+            
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
         }
 
         backgroundEventReaper.reap(time.milliseconds());

Review Comment:
   How are we going to account for events which expired and are removed from 
the queue by the event reaper? They probably ought to be included in the 
metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to