kirktrue commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1859369437


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -507,7 +511,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> 
partitions) {
 
         BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
         this.backgroundEventQueue = new LinkedBlockingQueue<>();
-        BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+        this.backgroundEventHandler = new BackgroundEventHandler(
+            backgroundEventQueue, time, asyncConsumerMetrics);

Review Comment:
   ```suggestion
           this.backgroundEventHandler = new BackgroundEventHandler(
               backgroundEventQueue,
               time,
               asyncConsumerMetrics
           );
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -214,10 +213,11 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
     private final AtomicReference<Optional<ConsumerGroupMetadata>> 
groupMetadata = new AtomicReference<>(Optional.empty());
-    private final KafkaConsumerMetrics kafkaConsumerMetrics;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;

Review Comment:
   I'd prefer it be left as `kafkaConsumerMetrics`. Calling out the distinction 
in the variable name isn't really adding anything (IMO). 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -350,7 +352,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> 
partitions) {
                     metrics,
                     fetchMetricsManager.throttleTimeSensor(),
                     
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
-                    backgroundEventHandler);
+                    backgroundEventHandler,
+                asyncConsumerMetrics);

Review Comment:
   Nit: minor alignment issue:
   
   ```suggestion
                       backgroundEventHandler,
                       asyncConsumerMetrics);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -61,6 +62,14 @@ public Uuid id() {
         return id;
     }
 
+    public void setEnqueuedMs(long enqueuedMs) {
+        this.enqueuedMs = enqueuedMs;
+    }

Review Comment:
   > we probably should be able to see the enqueued time in the string 
representation of the events
   
   Agreed. Good point. Thanks @AndrewJSchofield.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -338,7 +339,8 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> 
partitions) {
 
             ApiVersions apiVersions = new ApiVersions();
             final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
-            final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+            this.backgroundEventHandler = new BackgroundEventHandler(
+                backgroundEventQueue, time, asyncConsumerMetrics);

Review Comment:
   Nitpick: for better or worse, we've adopted this style for multi-line 
parameter lists:
   
   ```suggestion
               this.backgroundEventHandler = new BackgroundEventHandler(
                   backgroundEventQueue,
                   time,
                   asyncConsumerMetrics
               );
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java:
##########
@@ -38,6 +38,12 @@ public enum Type {
      */
     private final Uuid id;
 
+    /**
+     * The time in milliseconds when this event was enqueued.
+     * This field can be changed after the event is created, so it should not 
be used in hashCode, equals, or toStringBase.
+     */
+    private long enqueuedMs;

Review Comment:
   We also need the `enqueuedMs` in the `toStringBase()` method as per the 
`ApplicationEvent`’s method of the same name.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -42,26 +44,32 @@
 public class ApplicationEventHandler implements Closeable {
 
     private final Logger log;
+    private final Time time;
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;

Review Comment:
   I agree that this is an area that could use some refactoring. Thanks for 
filing KAFKA-18048, @FrankYang0529.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to