lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1858732555


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -153,7 +154,16 @@ void runOnce() {
                 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
                 .reduce(Long.MAX_VALUE, Math::min);
 
-        reapExpiredApplicationEvents(currentTimeMs);
+        List<CompletableEvent<?>> completableEvents = 
reapExpiredApplicationEvents(currentTimeMs);
+        
+        // If there is a metadata error, complete the completable events which 
are not passed by error event with the metadata error
+        if (networkClientDelegate.metadataError().isPresent()) {
+            Throwable metadataError = 
networkClientDelegate.metadataError().get();
+            completableEvents.stream()
+                    .filter(event -> !(event instanceof 
CompletableApplicationEvent && ((CompletableApplicationEvent<?>) 
event).isPassedByErrorEvent()))
+                    .forEach(event -> 
event.future().completeExceptionally(metadataError));
+            networkClientDelegate.clearMetadataError();

Review Comment:
   the trick is that events may complete right away on the call to process, so 
they will never fail with the metadata error because we would call 
completeExceptionally on a future that is already complete right?  (I wonder if 
this gap is the reason why you still need the `ErrorEvent` for metadata? if we 
remove the error event propagation on ln 156, and fill this gap, would it work? 
it would be nice because there would be one single consistent way of 
propagating metadata errors: via the calling api event. Ex. we would get 
TopicAuthException on poll and on positions in the same way, just because the 
CheckAndUpdatePositions fails. Makes sense? just suggesting to try it out, I 
could still be missing something but can see the gap in how we 
completeExceptionally with metadta errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to