lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1857306123


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -153,7 +154,16 @@ void runOnce() {
                 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
                 .reduce(Long.MAX_VALUE, Math::min);
 
-        reapExpiredApplicationEvents(currentTimeMs);
+        List<CompletableEvent<?>> completableEvents = 
reapExpiredApplicationEvents(currentTimeMs);
+        
+        // If there is a metadata error, complete the completable events which 
are not passed by error event with the metadata error
+        if (networkClientDelegate.metadataError().isPresent()) {
+            Throwable metadataError = 
networkClientDelegate.metadataError().get();
+            completableEvents.stream()
+                    .filter(event -> !(event instanceof 
CompletableApplicationEvent && ((CompletableApplicationEvent<?>) 
event).isPassedByErrorEvent()))
+                    .forEach(event -> 
event.future().completeExceptionally(metadataError));
+            networkClientDelegate.clearMetadataError();

Review Comment:
   if there were no events to propagate the error, this line will still clear 
the error. Shouldn't we clear only if we were able to propagate it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -150,6 +152,7 @@ private void maybePropagateMetadataError() {
         try {
             metadata.maybeThrowAnyException();
         } catch (Exception e) {
+            metadataError = Optional.of(e);
             backgroundEventHandler.add(new ErrorEvent(e));

Review Comment:
   Seems to me you're really close to being able to remove this duplicated way 
of propagating metadata errors (and simplify the `isPassedByErrorEvent`)
   
   We now have a way to propagate metadata errors based on the triggering 
event, to fill a gap we have to let calls like position know about them. Then 
why exactly is it that we still need to propagate them via ErrorEvents still 
too? Seems to be for the consumer.poll case only, but I would expect that (once 
the gap from the comment above is addressed), the CheckAndUpdatePositions event 
should fail with subscription metadata errors, no matter if called from 
consumer.position or from consumer.poll. Makes sense? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -153,7 +154,16 @@ void runOnce() {
                 .map(rm -> rm.maximumTimeToWait(currentTimeMs))
                 .reduce(Long.MAX_VALUE, Math::min);
 
-        reapExpiredApplicationEvents(currentTimeMs);
+        List<CompletableEvent<?>> completableEvents = 
reapExpiredApplicationEvents(currentTimeMs);
+        
+        // If there is a metadata error, complete the completable events which 
are not passed by error event with the metadata error
+        if (networkClientDelegate.metadataError().isPresent()) {
+            Throwable metadataError = 
networkClientDelegate.metadataError().get();
+            completableEvents.stream()
+                    .filter(event -> !(event instanceof 
CompletableApplicationEvent && ((CompletableApplicationEvent<?>) 
event).isPassedByErrorEvent()))
+                    .forEach(event -> 
event.future().completeExceptionally(metadataError));
+            networkClientDelegate.clearMetadataError();

Review Comment:
   should we encapsulate this in a kind of `maybeFailOnMetadataError` that 
would take the events? 
   
   Then I wonder if we need to do the same also before actually processing each 
event?? I'm thinking that, events that do require subscription metadata (ex. 
CheckAndUpdatePositions), may be processed, not generate any request and 
complete right away, so they will never know about the metadata error right?  
Right before:
   
https://github.com/apache/kafka/blob/57c4c386796028dc4144dfb50a2a786dac0a51a3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L181
   
   Do we need to `maybeFailOnMetadataError(event)` to cover this gap?
   
   Note that it's not a replacement for the maybeFailOnMetadataError of 
awaiting events, I wonder if we need them both (to cover events that complete 
right away and never make it to the "awaiting" list, and also the ones that are 
left waiting).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1683,10 +1683,10 @@ private Fetch<K, V> collectFetch() {
      * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
      *                                       defined
      */
-    private boolean updateFetchPositions(final Timer timer) {
+    private boolean updateFetchPositions(final Timer timer, final boolean 
isPassedByErrorEvent) {

Review Comment:
   if the suggestion below works and we can get rid of the ErrorEvent for 
metadata errors, I hope we could end up removing this param, and just having a 
kind of requiresSubscriptionMetadata defined in events, and the 
CheckAndUpdatePositions event would have that true. So the network client would 
know that it needs to completeExceptionally the future for that event (and not 
for others that are not related to subscription metadata)....just an idea that 
depends on whether we can simplify the metadata error event  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to