kirktrue commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1859304634


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -168,6 +171,8 @@ private void processApplicationEvents() {
                 if (event instanceof CompletableEvent)
                     applicationEventReaper.add((CompletableEvent<?>) event);
 
+                List<CompletableEvent<?>> completableEvents = 
reapExpiredApplicationEvents(currentTimeMs);

Review Comment:
   Do we need to reap the events on every loop?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java:
##########
@@ -32,4 +32,8 @@ public class CheckAndUpdatePositionsEvent extends 
CompletableApplicationEvent<Bo
     public CheckAndUpdatePositionsEvent(long deadlineMs) {
         super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
     }
+
+    public CheckAndUpdatePositionsEvent(long deadlineMs, boolean 
isPassedByErrorEvent) {
+        super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs, 
isPassedByErrorEvent);
+    }

Review Comment:
   At this point it looks like `CheckAndUpdatePositionsEvent` is the only event 
that uses `isPassedByErrorEvent`. Any reason not to define it in this event as 
opposed to the superclass?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -102,6 +102,8 @@ public void reap(long currentTimeMs) {
         // Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
         // include any events that finished successfully as well as any events 
we just completed exceptionally above.
         tracked.removeIf(e -> e.future().isDone());
+        
+        return tracked;

Review Comment:
   I'm not sure I understand this pattern. Can we add a new method that returns 
the tracked events instead of overloading the purpose of `reap()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -325,4 +330,23 @@ void cleanup() {
             log.debug("Closed the consumer network thread");
         }
     }
+
+    /**
+     * If there is a metadata error, complete the completable events which are 
not passed by 
+     * error event with the metadata error
+     * @param events the completable events
+     */
+    private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
+        if (networkClientDelegate.metadataError().isPresent()) {
+            Throwable metadataError = 
networkClientDelegate.metadataError().get();
+            List<CompletableEvent<?>> completableApplicationEvent = 
events.stream()
+                    .filter(event -> !(event instanceof 
CompletableApplicationEvent && 
+                            ((CompletableApplicationEvent<?>) 
event).isPassedByErrorEvent()))
+                    .collect(Collectors.toList());
+            if (!completableApplicationEvent.isEmpty()) {
+                completableApplicationEvent.forEach(event -> 
event.future().completeExceptionally(metadataError));
+                networkClientDelegate.clearMetadataError();
+            }
+        }
+    }

Review Comment:
   Do we only want to clear the error if an event "consumed" the error?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -325,4 +330,23 @@ void cleanup() {
             log.debug("Closed the consumer network thread");
         }
     }
+
+    /**
+     * If there is a metadata error, complete the completable events which are 
not passed by 
+     * error event with the metadata error
+     * @param events the completable events
+     */
+    private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
+        if (networkClientDelegate.metadataError().isPresent()) {
+            Throwable metadataError = 
networkClientDelegate.metadataError().get();
+            List<CompletableEvent<?>> completableApplicationEvent = 
events.stream()
+                    .filter(event -> !(event instanceof 
CompletableApplicationEvent && 
+                            ((CompletableApplicationEvent<?>) 
event).isPassedByErrorEvent()))
+                    .collect(Collectors.toList());
+            if (!completableApplicationEvent.isEmpty()) {
+                completableApplicationEvent.forEach(event -> 
event.future().completeExceptionally(metadataError));
+                networkClientDelegate.clearMetadataError();
+            }
+        }
+    }

Review Comment:
   It seems like this could be a little more succinct:
   
   ```suggestion
       private void maybeFailOnMetadataError() {
          networkClientDelegate.metadataError().ifPresent(metadataError -> {
               applicationEventReaper.tracked().stream()
                   .filter(e -> e instanceof CompletableApplicationEvent<?>)
                   .map(e -> (CompletableApplicationEvent<?>) e)
                   .filter(e -> e.isPassedByErrorEvent())
                   .filter(e -> !e.future().isDone())
                   .forEach(e -> 
e.future().completeExceptionally(metadataError));
               networkClientDelegate.clearMetadataError();
           }));
       }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -168,6 +171,8 @@ private void processApplicationEvents() {
                 if (event instanceof CompletableEvent)
                     applicationEventReaper.add((CompletableEvent<?>) event);
 
+                List<CompletableEvent<?>> completableEvents = 
reapExpiredApplicationEvents(currentTimeMs);

Review Comment:
   It seems like we should remove the reaping from this method entirely and 
have `runOnce()` call `maybeFailOnMetadataError()` directly.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##########
@@ -38,6 +43,13 @@ protected CompletableApplicationEvent(final Type type, final 
long deadlineMs) {
         this.deadlineMs = deadlineMs;
     }
 
+    protected CompletableApplicationEvent(final Type type, final long 
deadlineMs, final boolean isPassedByErrorEvent) {
+        super(type);
+        this.future = new CompletableFuture<>();
+        this.deadlineMs = deadlineMs;
+        this.isPassedByErrorEvent = isPassedByErrorEvent;
+    }

Review Comment:
   I'd prefer to see the other constructor call this one so that we have as few 
paths as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to