lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1857306123
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -153,7 +154,16 @@ void runOnce() { .map(rm -> rm.maximumTimeToWait(currentTimeMs)) .reduce(Long.MAX_VALUE, Math::min); - reapExpiredApplicationEvents(currentTimeMs); + List<CompletableEvent<?>> completableEvents = reapExpiredApplicationEvents(currentTimeMs); + + // If there is a metadata error, complete the completable events which are not passed by error event with the metadata error + if (networkClientDelegate.metadataError().isPresent()) { + Throwable metadataError = networkClientDelegate.metadataError().get(); + completableEvents.stream() + .filter(event -> !(event instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) event).isPassedByErrorEvent())) + .forEach(event -> event.future().completeExceptionally(metadataError)); + networkClientDelegate.clearMetadataError(); Review Comment: if there were no events to propagate the error, this line will still clear the error. Shouldn't we clear only if we were able to propagate it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -150,6 +152,7 @@ private void maybePropagateMetadataError() { try { metadata.maybeThrowAnyException(); } catch (Exception e) { + metadataError = Optional.of(e); backgroundEventHandler.add(new ErrorEvent(e)); Review Comment: Seems to me you're really close to being able to remove this duplicated way of propagating metadata errors (and simplify the `isPassedByErrorEvent`) We now have a way to propagate metadata errors based on the triggering event, to fill a gap we have to let calls like position know about them. Then why exactly is it that we still need to propagate them via ErrorEvents still too? Seems to be for the consumer.poll case only, but I would expect that (once the gap from the comment above is addressed), the CheckAndUpdatePositions event should fail with subscription metadata errors, no matter if called from consumer.position or from consumer.poll. Makes sense? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -153,7 +154,16 @@ void runOnce() { .map(rm -> rm.maximumTimeToWait(currentTimeMs)) .reduce(Long.MAX_VALUE, Math::min); - reapExpiredApplicationEvents(currentTimeMs); + List<CompletableEvent<?>> completableEvents = reapExpiredApplicationEvents(currentTimeMs); + + // If there is a metadata error, complete the completable events which are not passed by error event with the metadata error + if (networkClientDelegate.metadataError().isPresent()) { + Throwable metadataError = networkClientDelegate.metadataError().get(); + completableEvents.stream() + .filter(event -> !(event instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) event).isPassedByErrorEvent())) + .forEach(event -> event.future().completeExceptionally(metadataError)); + networkClientDelegate.clearMetadataError(); Review Comment: should we encapsulate this in a kind of `maybeFailOnMetadataError` that would take the events? Then I wonder if we need to do the same also before actually processing each event?? I'm thinking that, events that do require subscription metadata (ex. CheckAndUpdatePositions), may be processed, not generate any request and complete right away, so they will never know about the metadata error right? Right before: https://github.com/apache/kafka/blob/57c4c386796028dc4144dfb50a2a786dac0a51a3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L181 Do we need to `maybeFailOnMetadataError(event)` to cover this gap? Note that it's not a replacement for the maybeFailOnMetadataError of awaiting events, I wonder if we need them both (to cover events that complete right away and never make it to the "awaiting" list, and also the ones that are left waiting). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1683,10 +1683,10 @@ private Fetch<K, V> collectFetch() { * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined */ - private boolean updateFetchPositions(final Timer timer) { + private boolean updateFetchPositions(final Timer timer, final boolean isPassedByErrorEvent) { Review Comment: if the suggestion below works and we can get rid of the ErrorEvent for metadata errors, I hope we could end up removing this param, and just having a kind of requiresSubscriptionMetadata defined in events, and the CheckAndUpdatePositions event would have that true. So the network client would know that it needs to completeExceptionally the future for that event (and not for others that are not related to subscription metadata)....just an idea that depends on whether we can simplify the metadata error event -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org