kirktrue commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1868623843


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -325,4 +328,17 @@ void cleanup() {
             log.debug("Closed the consumer network thread");
         }
     }
+
+    /**
+     * If there is a metadata error, completed all uncompleted events with the 
metadata error.
+     */
+    private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
+        if (networkClientDelegate.metadataError().isPresent()) {
+            Throwable metadataError = 
networkClientDelegate.metadataError().get();
+            if (!events.isEmpty()) {
+                events.forEach(event -> 
event.future().completeExceptionally(metadataError));
+                networkClientDelegate.clearMetadataError();
+            }
+        }
+    }

Review Comment:
   Could we perform a pseudo `getAndClear()` with the metadata error?
   
   ```suggestion
       private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
               if (events.isEmpty())
                   return;
   
               Optional<Exception> error = 
networkClientDelegate.getAndClearMetadataError();
               error.ifPresent(e -> events.forEach(event -> 
event.future().completeExceptionally(e));
           }
       }
   ```
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -165,9 +167,10 @@ private void processApplicationEvents() {
 
         for (ApplicationEvent event : events) {
             try {
-                if (event instanceof CompletableEvent)
+                if (event instanceof CompletableEvent) {
                     applicationEventReaper.add((CompletableEvent<?>) event);
-
+                    maybeFailOnMetadataError(List.of((CompletableEvent<?>) 
event));

Review Comment:
   Could you add a comment about why we're calling `maybeFailOnMetadataError()` 
here as well as at the end of `runOnce()`? I'm assuming this handles an 
important edge case, but I'm not sure I understand why from just looking at the 
code. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -230,6 +227,14 @@ private ClientRequest makeClientRequest(
             unsent.handler
         );
     }
+    
+    public Optional<Exception> metadataError() {
+        return metadataError;
+    }
+    
+    public void clearMetadataError() {
+        metadataError = Optional.empty();
+    }

Review Comment:
   Then this can be a single operation:
   
   ```suggestion
       public Optional<Exception> getAndClearMetadataError() {
           Optional<Exception> e = metadataError;
           metadataError = Optional.empty();
           return e;
       }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -155,4 +156,13 @@ public int size() {
     public boolean contains(CompletableEvent<?> event) {
         return event != null && tracked.contains(event);
     }
+
+    public List<CompletableEvent<?>> uncompletedEvents() {
+        return tracked.stream()
+                .filter(e -> e instanceof CompletableApplicationEvent<?>)
+                .map(e -> (CompletableApplicationEvent<?>) e)
+                .filter(e -> !e.future().isDone())
+                .collect(Collectors.toList());
+    }

Review Comment:
   You don't have to filter by `CompletableApplicationEvent` is because 
`tracked` holds `CompletableEvents` already. 
   
   ```suggestion
       public List<CompletableEvent<?>> uncompletedEvents() {
           return tracked.stream()
                   .filter(e -> !e.future().isDone())
                   .collect(Collectors.toList());
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to