kirktrue commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1859304634
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -168,6 +171,8 @@ private void processApplicationEvents() { if (event instanceof CompletableEvent) applicationEventReaper.add((CompletableEvent<?>) event); + List<CompletableEvent<?>> completableEvents = reapExpiredApplicationEvents(currentTimeMs); Review Comment: Do we need to reap the events on every loop? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java: ########## @@ -32,4 +32,8 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo public CheckAndUpdatePositionsEvent(long deadlineMs) { super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs); } + + public CheckAndUpdatePositionsEvent(long deadlineMs, boolean isPassedByErrorEvent) { + super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs, isPassedByErrorEvent); + } Review Comment: At this point it looks like `CheckAndUpdatePositionsEvent` is the only event that uses `isPassedByErrorEvent`. Any reason not to define it in this event as opposed to the superclass? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -102,6 +102,8 @@ public void reap(long currentTimeMs) { // Second, remove any events that are already complete, just to make sure we don't hold references. This will // include any events that finished successfully as well as any events we just completed exceptionally above. tracked.removeIf(e -> e.future().isDone()); + + return tracked; Review Comment: I'm not sure I understand this pattern. Can we add a new method that returns the tracked events instead of overloading the purpose of `reap()`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -325,4 +330,23 @@ void cleanup() { log.debug("Closed the consumer network thread"); } } + + /** + * If there is a metadata error, complete the completable events which are not passed by + * error event with the metadata error + * @param events the completable events + */ + private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { + if (networkClientDelegate.metadataError().isPresent()) { + Throwable metadataError = networkClientDelegate.metadataError().get(); + List<CompletableEvent<?>> completableApplicationEvent = events.stream() + .filter(event -> !(event instanceof CompletableApplicationEvent && + ((CompletableApplicationEvent<?>) event).isPassedByErrorEvent())) + .collect(Collectors.toList()); + if (!completableApplicationEvent.isEmpty()) { + completableApplicationEvent.forEach(event -> event.future().completeExceptionally(metadataError)); + networkClientDelegate.clearMetadataError(); + } + } + } Review Comment: Do we only want to clear the error if an event "consumed" the error? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -325,4 +330,23 @@ void cleanup() { log.debug("Closed the consumer network thread"); } } + + /** + * If there is a metadata error, complete the completable events which are not passed by + * error event with the metadata error + * @param events the completable events + */ + private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { + if (networkClientDelegate.metadataError().isPresent()) { + Throwable metadataError = networkClientDelegate.metadataError().get(); + List<CompletableEvent<?>> completableApplicationEvent = events.stream() + .filter(event -> !(event instanceof CompletableApplicationEvent && + ((CompletableApplicationEvent<?>) event).isPassedByErrorEvent())) + .collect(Collectors.toList()); + if (!completableApplicationEvent.isEmpty()) { + completableApplicationEvent.forEach(event -> event.future().completeExceptionally(metadataError)); + networkClientDelegate.clearMetadataError(); + } + } + } Review Comment: It seems like this could be a little more succinct: ```suggestion private void maybeFailOnMetadataError() { networkClientDelegate.metadataError().ifPresent(metadataError -> { applicationEventReaper.tracked().stream() .filter(e -> e instanceof CompletableApplicationEvent<?>) .map(e -> (CompletableApplicationEvent<?>) e) .filter(e -> e.isPassedByErrorEvent()) .filter(e -> !e.future().isDone()) .forEach(e -> e.future().completeExceptionally(metadataError)); networkClientDelegate.clearMetadataError(); })); } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -168,6 +171,8 @@ private void processApplicationEvents() { if (event instanceof CompletableEvent) applicationEventReaper.add((CompletableEvent<?>) event); + List<CompletableEvent<?>> completableEvents = reapExpiredApplicationEvents(currentTimeMs); Review Comment: It seems like we should remove the reaping from this method entirely and have `runOnce()` call `maybeFailOnMetadataError()` directly. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java: ########## @@ -38,6 +43,13 @@ protected CompletableApplicationEvent(final Type type, final long deadlineMs) { this.deadlineMs = deadlineMs; } + protected CompletableApplicationEvent(final Type type, final long deadlineMs, final boolean isPassedByErrorEvent) { + super(type); + this.future = new CompletableFuture<>(); + this.deadlineMs = deadlineMs; + this.isPassedByErrorEvent = isPassedByErrorEvent; + } Review Comment: I'd prefer to see the other constructor call this one so that we have as few paths as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org