lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1868113120
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -155,4 +156,13 @@ public int size() { public boolean contains(CompletableEvent<?> event) { return event != null && tracked.contains(event); } + + public List<CompletableEvent<?>> reapUnCompletableApplicationEvents() { Review Comment: this is not really "reaping"/cancelling any event, so should we rename it? It's just retrieving uncompleted events, so maybe uncompletedEvents/trackedEvents.... ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -325,4 +327,18 @@ void cleanup() { log.debug("Closed the consumer network thread"); } } + + /** + * If there is a metadata error, completed all uncompleted events with the metadata error. + */ + private void maybeFailOnMetadataError() { + List<CompletableEvent<?>> events = applicationEventReaper.reapUnCompletableApplicationEvents(); Review Comment: this is called from within a loop over the events on ln 171, so it means that in that case, we would be iterating all events in the reaper and here, for each event, right? what about this receiving the List of events as argument, then we have 2 usages: 1. `maybeFailOnMetadataError(pendingCompletableEvents)` -> called with a list of events after all events are processed on ln 157. Argument taken from the reaper.trackedEvents. 2. `maybeFailOnMetadataError(List.of(event))` -> called before processing each event, argument taken from the current event in the loop. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org