kirktrue commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1859369437
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -507,7 +511,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) { BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventQueue = new LinkedBlockingQueue<>(); - BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + this.backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, asyncConsumerMetrics); Review Comment: ```suggestion this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, asyncConsumerMetrics ); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -214,10 +213,11 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; private final AtomicReference<Optional<ConsumerGroupMetadata>> groupMetadata = new AtomicReference<>(Optional.empty()); - private final KafkaConsumerMetrics kafkaConsumerMetrics; + private final AsyncConsumerMetrics asyncConsumerMetrics; Review Comment: I'd prefer it be left as `kafkaConsumerMetrics`. Calling out the distinction in the variable name isn't really adding anything (IMO). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -350,7 +352,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) { metrics, fetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), - backgroundEventHandler); + backgroundEventHandler, + asyncConsumerMetrics); Review Comment: Nit: minor alignment issue: ```suggestion backgroundEventHandler, asyncConsumerMetrics); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ########## @@ -61,6 +62,14 @@ public Uuid id() { return id; } + public void setEnqueuedMs(long enqueuedMs) { + this.enqueuedMs = enqueuedMs; + } Review Comment: > we probably should be able to see the enqueued time in the string representation of the events Agreed. Good point. Thanks @AndrewJSchofield. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -338,7 +339,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) { ApiVersions apiVersions = new ApiVersions(); final BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); + this.backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, asyncConsumerMetrics); Review Comment: Nitpick: for better or worse, we've adopted this style for multi-line parameter lists: ```suggestion this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, asyncConsumerMetrics ); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java: ########## @@ -38,6 +38,12 @@ public enum Type { */ private final Uuid id; + /** + * The time in milliseconds when this event was enqueued. + * This field can be changed after the event is created, so it should not be used in hashCode, equals, or toStringBase. + */ + private long enqueuedMs; Review Comment: We also need the `enqueuedMs` in the `toStringBase()` method as per the `ApplicationEvent`’s method of the same name. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java: ########## @@ -42,26 +44,32 @@ public class ApplicationEventHandler implements Closeable { private final Logger log; + private final Time time; private final BlockingQueue<ApplicationEvent> applicationEventQueue; Review Comment: I agree that this is an area that could use some refactoring. Thanks for filing KAFKA-18048, @FrankYang0529. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org