lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1847318144


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -42,6 +51,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue)
      */
     public void add(BackgroundEvent event) {
         Objects.requireNonNull(event, "BackgroundEvent provided to add must be 
non-null");
+        event.setEnqueuedMs(time.milliseconds());
         backgroundEventQueue.add(event);
+        kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()));

Review Comment:
   this change here makes sense to me, but makes me wonder if we should push it 
further. It would be helpful if we could try to keep all the updates for this 
queue size metric in this component that holds the queue, so we can easily 
maintain/track how "add" and  "remove/drain" update that metric.
   
   We could then use that drain from the processBackgroundEvents, instead of 
manually draining the queue and recording the metric there . What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -162,15 +171,20 @@ void runOnce() {
     private void processApplicationEvents() {
         LinkedList<ApplicationEvent> events = new LinkedList<>();
         applicationEventQueue.drainTo(events);
+        kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueSize(0));
 
         for (ApplicationEvent event : events) {
+            long startMs = time.milliseconds();
+            kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueTime(startMs - event.enqueuedMs()));
             try {
                 if (event instanceof CompletableEvent)
                     applicationEventReaper.add((CompletableEvent<?>) event);
 
                 applicationEventProcessor.process(event);
             } catch (Throwable t) {
                 log.warn("Error processing event {}", t.getMessage(), t);
+            } finally {
+                kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueProcessingTime(time.milliseconds() - 
startMs));

Review Comment:
   Is this the best place to record this? From the description of the metric I 
get that we want to measure the time "that the consumer network takes to 
process **all available application events**". So wouldn't it be simpler to 
record the metric once per `runOnce` instead of recording it N times on each 
run? (startTime right before the loop over events, and ending/recording right 
after the loop). 
   
   I went to the KIP discussion thread to double check this interpretation, and 
this was the intention behind what was proposed (by me actually I discovered he 
he).
   > LM3. Thinking about the actual usage of 
"time-between-network-thread-poll-xxx" metric, I imagine it would be helpful to 
know more about what could be impacting it. As I see it, the network thread 
cadence could be mainly impacted by: 1- app event processing (generate 
requests), 2- network client poll (actual send/receive). For 2, the new 
consumer reuses the same component as the legacy one, but 1 is specific to the 
new consumer, so what about a metric for application-event-processing-time-ms 
(we could consider avg I would say). It would be the time that the network 
thread takes to process all available events on each run.
   
   What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -162,15 +171,20 @@ void runOnce() {
     private void processApplicationEvents() {
         LinkedList<ApplicationEvent> events = new LinkedList<>();
         applicationEventQueue.drainTo(events);
+        kafkaAsyncConsumerMetrics.ifPresent(metrics -> 
metrics.recordApplicationEventQueueSize(0));

Review Comment:
   keeping the symmetry with the background event, would it make sense to 
encapsulate these actions in the `ApplicationEventHandler` so that we keep that 
component responsible of `add` and `drain` the queue (including the metric 
actions related to those ops)?
   
   It would mean that this `ConsumerNetworkThread` would keep the ref to the 
`ApplicationEventHandler` that has the queue (instead of directly having the 
queue like it does now), but that is already available, so I guess we just need 
to pass it in the constructor instead of the queue. What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements 
AutoCloseable {
+    private final Metrics metrics;
+
+    public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = 
"time-between-network-thread-poll";
+    public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"application-event-queue-size";
+    public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = 
"application-event-queue-time";
+    public static final String 
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"application-event-queue-processing-time";
+    public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"background-event-queue-size";
+    public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = 
"background-event-queue-time";
+    public static final String 
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"background-event-queue-processing-time";
+    public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = 
"unsent-requests-queue-size";
+    public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = 
"unsent-requests-queue-time";
+    private final Sensor timeBetweenNetworkThreadPollSensor;
+    private final  Sensor applicationEventQueueSizeSensor;
+    private final Sensor applicationEventQueueTimeSensor;
+    private final Sensor applicationEventQueueProcessingTimeSensor;
+    private final Sensor backgroundEventQueueSizeSensor;
+    private final Sensor backgroundEventQueueTimeSensor;
+    private final Sensor backgroundEventQueueProcessingTimeSensor;
+    private final Sensor unsentRequestsQueueSizeSensor;
+    private final Sensor unsentRequestsQueueTimeSensor;
+
+    public KafkaAsyncConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+        super(metrics, metricGrpPrefix);
+
+        this.metrics = metrics;
+        final String metricGroupName = metricGrpPrefix + 
CONSUMER_METRICS_SUFFIX;
+        this.timeBetweenNetworkThreadPollSensor = 
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
+        
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-avg",
+                metricGroupName,
+                "The average time taken, in milliseconds, between each poll in 
the network thread."),
+            new Avg());
+        
this.timeBetweenNetworkThreadPollSensor.add(metrics.metricName("time-between-network-thread-poll-max",
+                metricGroupName,
+                "The maximum time taken, in milliseconds, between each poll in 
the network thread."),
+            new Max());
+
+        this.applicationEventQueueSizeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
+        
this.applicationEventQueueSizeSensor.add(metrics.metricName("application-event-queue-size",
+                metricGroupName,
+                "The current number of events in the consumer network 
application event queue."),
+            new Value());
+
+        this.applicationEventQueueTimeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME);
+        
this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-avg",
+                metricGroupName,
+                "The average time, in milliseconds, that application events 
are taking to be dequeued."),
+            new Avg());
+        
this.applicationEventQueueTimeSensor.add(metrics.metricName("application-event-queue-time-max",
+                metricGroupName,
+                "The maximum time, in milliseconds, that an application event 
took to be dequeued."),
+            new Max());
+
+        this.applicationEventQueueProcessingTimeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+        
this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-avg",
+                metricGroupName,
+                "The average time, in milliseconds, that the consumer network 
takes to process all available application events."),
+            new Avg());
+        
this.applicationEventQueueProcessingTimeSensor.add(metrics.metricName("application-event-queue-processing-time-max",
+                metricGroupName,
+                "The maximum time, in milliseconds, that the consumer network 
took to process all available application events."),
+            new Max());
+
+        this.unsentRequestsQueueSizeSensor = 
metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME);
+        
this.unsentRequestsQueueSizeSensor.add(metrics.metricName("unsent-requests-queue-size",
+                metricGroupName,
+                "The current number of unsent requests in the consumer 
network."),
+            new Value());
+
+        this.unsentRequestsQueueTimeSensor = 
metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME);
+        
this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-avg",
+                metricGroupName,
+                "The average time, in milliseconds, that requests are taking 
to be sent in the consumer network."),
+            new Avg());
+        
this.unsentRequestsQueueTimeSensor.add(metrics.metricName("unsent-requests-queue-time-max",
+                metricGroupName,
+                "The maximum time, in milliseconds, that a request remained 
unsent in the consumer network."),
+            new Max());
+
+        this.backgroundEventQueueSizeSensor = 
metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME);
+        
this.backgroundEventQueueSizeSensor.add(metrics.metricName("background-event-queue-size",
+                metricGroupName,
+                "The current number of events in the consumer background event 
queue."),
+            new Value());
+
+        this.backgroundEventQueueTimeSensor = 
metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME);
+        
this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-avg",
+                metricGroupName,
+                "The average time, in milliseconds, that background events are 
taking to be dequeued."),
+            new Avg());
+        
this.backgroundEventQueueTimeSensor.add(metrics.metricName("background-event-queue-time-max",
+                metricGroupName,
+                "The maximum time, in milliseconds, that background events are 
taking to be dequeued."),
+            new Max());
+
+        this.backgroundEventQueueProcessingTimeSensor = 
metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+        
this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-avg",
+                metricGroupName,
+                "The average time, in milliseconds, that the consumer took to 
process all available background events."),
+            new Avg());
+        
this.backgroundEventQueueProcessingTimeSensor.add(metrics.metricName("background-event-queue-processing-time-max",
+                metricGroupName,
+                "The maximum time, in milliseconds, that the consumer took to 
process all available background events."),
+            new Max());
+    }
+
+    public void recordTimeBetweenNetworkThreadPoll(long 
timeBetweenNetworkThreadPoll) {
+        
this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll);
+    }
+
+    public void recordApplicationEventQueueSize(int size) {
+        this.applicationEventQueueSizeSensor.record(size);
+    }
+
+    public void recordApplicationEventQueueTime(long time) {
+        this.applicationEventQueueTimeSensor.record(time);
+    }
+
+    public void recordApplicationEventQueueProcessingTime(long processingTime) 
{
+        this.applicationEventQueueProcessingTimeSensor.record(processingTime);
+    }
+
+    public void recordUnsentRequestsQueueSize(int size) {

Review Comment:
   This metric is about the size of the queue at a given time, so I expect we 
should have another param here timeMs, for the time where we read the metric, 
and we should pass it into the .record, that has an overload for it. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -170,6 +174,7 @@ private void trySend(final long currentTimeMs) {
         Iterator<UnsentRequest> iterator = unsentRequests.iterator();
         while (iterator.hasNext()) {
             UnsentRequest unsent = iterator.next();
+            kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueTime(currentTimeMs - unsent.enqueuedMs()));

Review Comment:
   recording this here means we would consider the request removed from the 
unsent queue even in the case where it cannot be sent and it actually stays in 
the unsent queue (!doSend), right? If so, I guess we should probably record 
this only when we do remove it from the queue with iterator.remove() (either 
because it's expired, or because we did sent it).
   
   Also, shouldn't we record this same metric on `checkDisconnects` if the 
request is removed from the unsent queue because the node is disconnected?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1911,6 +1922,8 @@ private boolean processBackgroundEvents() {
 
                 if (!firstError.compareAndSet(null, e))
                     log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+            } finally {
+                
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);

Review Comment:
   Similar to `recordApplicationEventQueueProcessingTime`. The metric 
description states this is about the time "that the consumer took to process 
**all available background events**' . Shouldn't we simply take the time from 
right before the loop to right after it ends, and record the metric once per 
run of the `processBackgroundEvents`? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1893,14 +1899,19 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     *
+     * Visible for testing.
      */
-    private boolean processBackgroundEvents() {
+    boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
         LinkedList<BackgroundEvent> events = new LinkedList<>();
         backgroundEventQueue.drainTo(events);
+        
kafkaAsyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());

Review Comment:
   This is what I was wondering if we could encapsulate a 
BackgroundEventHandler.drain or similar, that would take care of draining the 
queue and recording the metric (all metric updates done there consistently)
   ```suggestion
           LinkedList<BackgroundEvent> events = 
backgroundEventHandler.drainBackgroundEvents();
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaAsyncConsumerMetrics.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+
+public class KafkaAsyncConsumerMetrics extends KafkaConsumerMetrics implements 
AutoCloseable {
+    private final Metrics metrics;
+
+    public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = 
"time-between-network-thread-poll";
+    public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"application-event-queue-size";
+    public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = 
"application-event-queue-time";
+    public static final String 
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"application-event-queue-processing-time";
+    public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"background-event-queue-size";
+    public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = 
"background-event-queue-time";
+    public static final String 
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"background-event-queue-processing-time";
+    public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = 
"unsent-requests-queue-size";
+    public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = 
"unsent-requests-queue-time";
+    private final Sensor timeBetweenNetworkThreadPollSensor;
+    private final  Sensor applicationEventQueueSizeSensor;

Review Comment:
   nit: extra space before Sensor



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -267,7 +272,9 @@ public void addAll(final List<UnsentRequest> requests) {
     public void add(final UnsentRequest r) {
         Objects.requireNonNull(r);
         r.setTimer(this.time, this.requestTimeoutMs);
+        r.setEnqueuedMs(this.time.milliseconds());
         unsentRequests.add(r);
+        kafkaConsumerMetrics.ifPresent(metrics -> 
metrics.recordUnsentRequestsQueueSize(unsentRequests.size()));

Review Comment:
   I'm still debating whether this is the best place to record this. We want 
snapshots in time of the queue size. Recording here has the limitation that we 
won't be recording when the size decreases (ie. requests sent, failed due to 
disconnections). So I wonder if recording this on poll, which is called 
regularly, would given a better view of the queue size? 
   
   The way add/poll are used from the ConsumerNetworkThread.runOnce they end up 
being called sequentially anyways, but I'm thinking about the case where, let's 
say managers are not returning any requests (so addAll is called with empty, 
add never called), but there could be unsent requests in the queue, that could 
be sent out, cancelled, time out, etc). Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to