kirktrue commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1868623843
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -325,4 +328,17 @@ void cleanup() { log.debug("Closed the consumer network thread"); } } + + /** + * If there is a metadata error, completed all uncompleted events with the metadata error. + */ + private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { + if (networkClientDelegate.metadataError().isPresent()) { + Throwable metadataError = networkClientDelegate.metadataError().get(); + if (!events.isEmpty()) { + events.forEach(event -> event.future().completeExceptionally(metadataError)); + networkClientDelegate.clearMetadataError(); + } + } + } Review Comment: Could we perform a pseudo `getAndClear()` with the metadata error? ```suggestion private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { if (events.isEmpty()) return; Optional<Exception> error = networkClientDelegate.getAndClearMetadataError(); error.ifPresent(e -> events.forEach(event -> event.future().completeExceptionally(e)); } } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -165,9 +167,10 @@ private void processApplicationEvents() { for (ApplicationEvent event : events) { try { - if (event instanceof CompletableEvent) + if (event instanceof CompletableEvent) { applicationEventReaper.add((CompletableEvent<?>) event); - + maybeFailOnMetadataError(List.of((CompletableEvent<?>) event)); Review Comment: Could you add a comment about why we're calling `maybeFailOnMetadataError()` here as well as at the end of `runOnce()`? I'm assuming this handles an important edge case, but I'm not sure I understand why from just looking at the code. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -230,6 +227,14 @@ private ClientRequest makeClientRequest( unsent.handler ); } + + public Optional<Exception> metadataError() { + return metadataError; + } + + public void clearMetadataError() { + metadataError = Optional.empty(); + } Review Comment: Then this can be a single operation: ```suggestion public Optional<Exception> getAndClearMetadataError() { Optional<Exception> e = metadataError; metadataError = Optional.empty(); return e; } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -155,4 +156,13 @@ public int size() { public boolean contains(CompletableEvent<?> event) { return event != null && tracked.contains(event); } + + public List<CompletableEvent<?>> uncompletedEvents() { + return tracked.stream() + .filter(e -> e instanceof CompletableApplicationEvent<?>) + .map(e -> (CompletableApplicationEvent<?>) e) + .filter(e -> !e.future().isDone()) + .collect(Collectors.toList()); + } Review Comment: You don't have to filter by `CompletableApplicationEvent` is because `tracked` holds `CompletableEvents` already. ```suggestion public List<CompletableEvent<?>> uncompletedEvents() { return tracked.stream() .filter(e -> !e.future().isDone()) .collect(Collectors.toList()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org