kirktrue commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1823530021
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java: ########## @@ -37,6 +37,7 @@ public enum Type { * {@link #equals(Object)} and can be used in log messages when debugging. */ private final Uuid id; + private long enqueuedMs; Review Comment: Same comment as per above with `ApplicationEvent`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java: ########## @@ -16,19 +16,23 @@ */ package org.apache.kafka.clients.consumer.internals.metrics; +import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Value; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; public class KafkaConsumerMetrics implements AutoCloseable { + private final GroupProtocol groupProtocol; Review Comment: Would it be possible to create a subclass of `KafkaConsumerMetrics` instead of adding group protocol-specific bits on here? I see that the `ShareConsumerImpl` uses a custom `KafkaShareConsumerMetrics` that at first glance is entirely a subset of `KafkaConsumerMetrics`, so there's some refactoring that could be done here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -162,15 +171,20 @@ void runOnce() { private void processApplicationEvents() { LinkedList<ApplicationEvent> events = new LinkedList<>(); applicationEventQueue.drainTo(events); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(applicationEventQueue.size())); for (ApplicationEvent event : events) { + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(time.milliseconds() - event.enqueuedMs())); + long startMs = time.milliseconds(); Review Comment: Can we swap the ordering of these two lines to avoid the extra call to `time.milliseconds()`? ```suggestion long startMs = time.milliseconds(); kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueTime(startMs - event.enqueuedMs())); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -60,21 +61,24 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier; private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier; private final Supplier<RequestManagers> requestManagersSupplier; + private final Optional<KafkaConsumerMetrics> kafkaConsumerMetrics; private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; private volatile boolean running; private final IdempotentCloser closer = new IdempotentCloser(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; + private long lastPollTimeMs = 0L; Review Comment: This is only ever written to and read from the same thread, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ########## @@ -61,6 +62,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } Review Comment: I don't like that we're introducing mutability into all the events. I understand that this changes _a lot_ less code than having the constructor take the creation time or something. Can you add a comment to the `enqueued` variable that states that because of its mutability that it should not be used in `hashCode()` or `equals()` or `toStringBase()` (or something along those lines)? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -42,6 +47,8 @@ public BackgroundEventHandler(final Queue<BackgroundEvent> backgroundEventQueue) */ public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); + event.setEnqueuedMs(System.currentTimeMillis()); Review Comment: We need to add a `Time` variable to this class so that we can do this instead: ```suggestion event.setEnqueuedMs(time.milliseconds()); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java: ########## @@ -50,44 +75,124 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { return TimeUnit.SECONDS.convert(now - lastPollMs, TimeUnit.MILLISECONDS); }; this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago", - metricGroupName, "The number of seconds since the last poll() invocation."); + metricGroupName, "The number of seconds since the last poll() invocation."); Review Comment: Can we remove these whitespace changes? They seem extraneous. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java: ########## @@ -39,8 +43,29 @@ public class KafkaConsumerMetrics implements AutoCloseable { private long pollStartMs; private long timeSinceLastPollMs; - public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { + // CONSUMER group protocol metrics + public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = "time-between-network-thread-poll"; + public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = "application-event-queue-size"; + public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = "application-event-queue-time"; + public static final String APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "application-event-queue-processing-time"; + public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = "background-event-queue-size"; + public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = "background-event-queue-time"; + public static final String BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = "background-event-queue-processing-time"; + public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = "unsent-requests-queue-size"; + public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = "unsent-requests-queue-time"; + private Sensor timeBetweenNetworkThreadPollSensor; + private Sensor applicationEventQueueSizeSensor; + private Sensor applicationEventQueueTimeSensor; + private Sensor applicationEventQueueProcessingTimeSensor; + private Sensor backgroundEventQueueSizeSensor; + private Sensor backgroundEventQueueTimeSensor; + private Sensor backgroundEventQueueProcessingTimeSensor; + private Sensor unsentRequestsQueueSizeSensor; + private Sensor unsentRequestsQueueTimeSensor; Review Comment: Can we make these `final` too? It'll be a bit ugly in the `else` block of the constructor to mark them all as `null`. But that goes away if we make this a subclass. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -162,15 +171,20 @@ void runOnce() { private void processApplicationEvents() { LinkedList<ApplicationEvent> events = new LinkedList<>(); applicationEventQueue.drainTo(events); + kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(applicationEventQueue.size())); Review Comment: Once we drain `applicationEventQueue`, it'll be empty, right? If, so why not just do: ```suggestion kafkaConsumerMetrics.ifPresent(metrics -> metrics.recordApplicationEventQueueSize(0)); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java: ########## @@ -111,12 +216,62 @@ public void recordCommitted(long duration) { this.committedSensor.record(duration); } + public void recordTimeBetweenNetworkThreadPoll(long timeBetweenNetworkThreadPoll) { + this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll); + } + + public void recordApplicationEventQueueSize(int size) { + this.applicationEventQueueSizeSensor.record(size); + } + + public void recordApplicationEventQueueTime(long time) { + this.applicationEventQueueTimeSensor.record(time); + } + + public void recordApplicationEventQueueProcessingTime(long processingTime) { + this.applicationEventQueueProcessingTimeSensor.record(processingTime); + } + + public void recordUnsentRequestsQueueSize(int size) { + this.unsentRequestsQueueSizeSensor.record(size); + } + + public void recordUnsentRequestsQueueTime(long time) { + this.unsentRequestsQueueTimeSensor.record(time); + } + + public void recordBackgroundEventQueueSize(int size) { + this.backgroundEventQueueSizeSensor.record(size); + } + + public void recordBackgroundEventQueueTime(long time) { + this.backgroundEventQueueTimeSensor.record(time); + } + + public void recordBackgroundEventQueueProcessingTime(long processingTime) { + this.backgroundEventQueueProcessingTimeSensor.record(processingTime); + } + @Override public void close() { metrics.removeMetric(lastPollMetricName); metrics.removeSensor(timeBetweenPollSensor.name()); metrics.removeSensor(pollIdleSensor.name()); metrics.removeSensor(commitSyncSensor.name()); metrics.removeSensor(committedSensor.name()); + + if (groupProtocol == GroupProtocol.CONSUMER) { + Arrays.asList( + timeBetweenNetworkThreadPollSensor.name(), + applicationEventQueueSizeSensor.name(), + applicationEventQueueTimeSensor.name(), + applicationEventQueueProcessingTimeSensor.name(), + backgroundEventQueueSizeSensor.name(), + backgroundEventQueueTimeSensor.name(), + backgroundEventQueueProcessingTimeSensor.name(), + unsentRequestsQueueSizeSensor.name(), + unsentRequestsQueueTimeSensor.name() + ).forEach(metrics::removeSensor); + } Review Comment: This is another place that having a subclass would help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org