lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1872198214


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -317,6 +327,20 @@ Timer timer() {
             return timer;
         }
 
+        /**
+         * Set the time when the request was enqueued to {@link 
NetworkClientDelegate#unsentRequests}.
+         */
+        void setEnqueueTimeMs(final long enqueueTimeMs) {
+            this.enqueueTimeMs = enqueueTimeMs;
+        }
+
+        /**
+         * Return the time when the request was enqueued to {@link 
NetworkClientDelegate#unsentRequests}.
+         */
+        long enqueueTimeMs() {

Review Comment:
   this one is less sensitive, but if it's only used here as it seems we could 
consider private too



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ApplicationEventHandlerTest {
+    private final Time time = new MockTime();
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue =  
new LinkedBlockingQueue<>();
+    private final ApplicationEventProcessor applicationEventProcessor = 
mock(ApplicationEventProcessor.class);
+    private final NetworkClientDelegate networkClientDelegate = 
mock(NetworkClientDelegate.class);
+    private final RequestManagers requestManagers = 
mock(RequestManagers.class);
+    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
+
+    @Test
+    public void testRecordApplicationEventQueueSize() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, "consumer");
+             ApplicationEventHandler applicationEventHandler = new 
ApplicationEventHandler(
+                     new LogContext(),
+                     time,
+                     applicationEventsQueue,
+                     applicationEventReaper,
+                     () -> applicationEventProcessor,
+                     () -> networkClientDelegate,
+                     () -> requestManagers,
+                     asyncConsumerMetrics
+             )) {
+            PollEvent event = new PollEvent(time.milliseconds());
+
+            // add event
+            applicationEventHandler.add(event);
+            assertEquals(1, (double) 
metrics.metric(metrics.metricName("application-event-queue-size", 
"consumer-metrics")).metricValue());

Review Comment:
   could we reuse the metric name constants we already have?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     *
+     * Visible for testing.
      */
-    private boolean processBackgroundEvents() {
+    boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
-        LinkedList<BackgroundEvent> events = new LinkedList<>();
-        backgroundEventQueue.drainTo(events);
-
-        for (BackgroundEvent event : events) {
-            try {
-                if (event instanceof CompletableEvent)
-                    backgroundEventReaper.add((CompletableEvent<?>) event);
-
-                backgroundEventProcessor.process(event);
-            } catch (Throwable t) {
-                KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
-                if (!firstError.compareAndSet(null, e))
-                    log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+        List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
+        if (!events.isEmpty()) {
+            long startMs = time.milliseconds();
+            for (BackgroundEvent event : events) {
+                
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+                try {
+                    if (event instanceof CompletableEvent)
+                        backgroundEventReaper.add((CompletableEvent<?>) event);
+
+                    backgroundEventProcessor.process(event);
+                } catch (Throwable t) {
+                    KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+                    if (!firstError.compareAndSet(null, e))
+                        log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
+                }
             }
+            
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
         }
 
         backgroundEventReaper.reap(time.milliseconds());

Review Comment:
   Interesting, and if we agree on what we want we could just send an update in 
the KIP email thread to add it to the KIP and here.
   
   To align internally first, I guess we would be interested in the num/avg of 
expired events, but we need to consider how that metric would go crazy and be a 
false alarm in cases like poll(0) right? Should we consider relevant the 
expiration only if there was a non-zero timeout? Thoughts?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ApplicationEventHandlerTest {
+    private final Time time = new MockTime();
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue =  
new LinkedBlockingQueue<>();
+    private final ApplicationEventProcessor applicationEventProcessor = 
mock(ApplicationEventProcessor.class);
+    private final NetworkClientDelegate networkClientDelegate = 
mock(NetworkClientDelegate.class);
+    private final RequestManagers requestManagers = 
mock(RequestManagers.class);
+    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
+
+    @Test
+    public void testRecordApplicationEventQueueSize() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, "consumer");
+             ApplicationEventHandler applicationEventHandler = new 
ApplicationEventHandler(
+                     new LogContext(),
+                     time,
+                     applicationEventsQueue,
+                     applicationEventReaper,
+                     () -> applicationEventProcessor,
+                     () -> networkClientDelegate,
+                     () -> requestManagers,
+                     asyncConsumerMetrics
+             )) {
+            PollEvent event = new PollEvent(time.milliseconds());
+
+            // add event
+            applicationEventHandler.add(event);

Review Comment:
   ```suggestion
               // add event
               applicationEventHandler.add(new PollEvent(time.milliseconds()));
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -317,6 +327,20 @@ Timer timer() {
             return timer;
         }
 
+        /**
+         * Set the time when the request was enqueued to {@link 
NetworkClientDelegate#unsentRequests}.
+         */
+        void setEnqueueTimeMs(final long enqueueTimeMs) {

Review Comment:
   this should be private right? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BackgroundEventHandlerTest {
+    private final BlockingQueue<BackgroundEvent> backgroundEventsQueue =  new 
LinkedBlockingQueue<>();
+
+    @Test
+    public void testRecordBackgroundEventQueueSize() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, "consumer")) {
+            BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
+                backgroundEventsQueue,
+                new MockTime(0),
+                asyncConsumerMetrics);
+            BackgroundEvent event = new ErrorEvent(new Throwable());
+
+            // add event
+            backgroundEventHandler.add(event);

Review Comment:
   ```suggestion
               // add event
               backgroundEventHandler.add(new ErrorEvent(new Throwable()));
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -199,4 +203,57 @@ public void testSendUnsentRequests() {
         consumerNetworkThread.cleanup();
         verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
     }
+
+    @Test
+    public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, "consumer");
+             ConsumerNetworkThread consumerNetworkThread = new 
ConsumerNetworkThread(
+                     new LogContext(),
+                     time,
+                     applicationEventQueue,
+                     applicationEventReaper,
+                     () -> applicationEventProcessor,
+                     () -> networkClientDelegate,
+                     () -> requestManagers,
+                     asyncConsumerMetrics
+             )) {
+            consumerNetworkThread.initializeResources();
+
+            consumerNetworkThread.runOnce();
+            time.sleep(10);
+            consumerNetworkThread.runOnce();
+            assertTrue((double) 
metrics.metric(metrics.metricName("time-between-network-thread-poll-avg", 
"consumer-metrics")).metricValue() > 0);
+            assertTrue((double) 
metrics.metric(metrics.metricName("time-between-network-thread-poll-max", 
"consumer-metrics")).metricValue() > 0);

Review Comment:
   couldn't we be more precise here (I guess it should be 10 exactly given how 
this is calculated right? not sure if we would need >= in this case)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1951,6 +1952,31 @@ public void 
testSubscribePatternAgainstBrokerNotSupportingRegex() throws Interru
         }, "Consumer did not throw the expected UnsupportedVersionException on 
poll");
     }
 
+    @Test
+    public void 
testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
+        consumer = newConsumer(
+                mock(FetchBuffer.class),
+                mock(ConsumerInterceptors.class),
+                mock(ConsumerRebalanceListenerInvoker.class),
+                mock(SubscriptionState.class),
+                "group-id",
+                "client-id",
+                false);
+        Metrics metrics = consumer.metricsRegistry();
+        AsyncConsumerMetrics kafkaConsumerMetrics = 
consumer.kafkaConsumerMetrics();
+
+        ConsumerRebalanceListenerCallbackNeededEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
Collections.emptySortedSet());
+        event.setEnqueuedMs(time.milliseconds());
+        backgroundEventQueue.add(event);
+        kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
+
+        time.sleep(10);
+        consumer.processBackgroundEvents();
+        assertEquals(0, (double) 
metrics.metric(metrics.metricName("background-event-queue-size", 
"consumer-metrics")).metricValue());
+        assertTrue((double) 
metrics.metric(metrics.metricName("background-event-queue-time-avg", 
"consumer-metrics")).metricValue() > 0);
+        assertTrue((double) 
metrics.metric(metrics.metricName("background-event-queue-time-max", 
"consumer-metrics")).metricValue() > 0);

Review Comment:
   couldn't we be more precise here and expect >= 10? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements 
AutoCloseable {
+    private final Metrics metrics;
+
+    public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = 
"time-between-network-thread-poll";
+    public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"application-event-queue-size";
+    public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = 
"application-event-queue-time";
+    public static final String 
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"application-event-queue-processing-time";
+    public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"background-event-queue-size";
+    public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = 
"background-event-queue-time";
+    public static final String 
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"background-event-queue-processing-time";
+    public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = 
"unsent-requests-queue-size";
+    public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = 
"unsent-requests-queue-time";
+    private final Sensor timeBetweenNetworkThreadPollSensor;
+    private final Sensor applicationEventQueueSizeSensor;
+    private final Sensor applicationEventQueueTimeSensor;
+    private final Sensor applicationEventQueueProcessingTimeSensor;
+    private final Sensor backgroundEventQueueSizeSensor;
+    private final Sensor backgroundEventQueueTimeSensor;
+    private final Sensor backgroundEventQueueProcessingTimeSensor;
+    private final Sensor unsentRequestsQueueSizeSensor;
+    private final Sensor unsentRequestsQueueTimeSensor;
+
+    public AsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+        super(metrics, metricGrpPrefix);
+
+        this.metrics = metrics;
+        final String metricGroupName = metricGrpPrefix + 
CONSUMER_METRICS_SUFFIX;
+        this.timeBetweenNetworkThreadPollSensor = 
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
+        
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg",
+                metricGroupName,
+                "The average time taken, in milliseconds, between each poll in 
the network thread."),
+            new Avg());
+        
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max",
+                metricGroupName,
+                "The maximum time taken, in milliseconds, between each poll in 
the network thread."),
+            new Max());
+
+        this.applicationEventQueueSizeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
+        
this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size",
+                metricGroupName,
+                "The current number of events in the consumer network 
application event queue."),

Review Comment:
   nit: I guess that, in a time from now, even us that know this by heart will 
get tricked with if this is the outgoing or incoming queue. Should we be more 
explicit with something like 
   
   ```suggestion
                   "The current number of events in the queue to send from the 
application thread to the background thread."),
   ```
   
   (and then we can consistently have the flipped version of the message for 
the `background-event-queue-size` metric)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to