kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2412300805


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1771,15 +1827,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
             return fetch;
         }
 
-        // send any new fetches (won't resend pending fetches)
-        sendFetches(timer);
-
         // We do not want to be stuck blocking in poll if we are missing some 
positions
         // since the offset lookup may be backing off after a failure
-
-        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
-        // updateAssignmentMetadataIfNeeded before this method.
-        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+        if (pollTimeout > retryBackoffMs) {

Review Comment:
   That said, I will test locally with the following code snippet:
   
   ```java
           // With the non-blocking poll design, it's possible that at this 
point the background thread is
           // concurrently working to update positions. Therefore, a _copy_ of 
the current assignment is retrieved
           // and iterated looking for any partitions with invalid positions. 
This is done to avoid being stuck
           // in poll for an unnecessarily long amount of time if we are 
missing some positions since the offset
           // lookup may be backing off after a failure.
           if (pollTimeout > retryBackoffMs) {
               for (TopicPartition tp : subscriptions.assignedPartitions()) {
                   if (!subscriptions.hasValidPosition(tp)) {
                       pollTimeout = retryBackoffMs;
                       break;
                   }
               }
           }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1771,15 +1827,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
             return fetch;
         }
 
-        // send any new fetches (won't resend pending fetches)
-        sendFetches(timer);
-
         // We do not want to be stuck blocking in poll if we are missing some 
positions
         // since the offset lookup may be backing off after a failure
-
-        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
-        // updateAssignmentMetadataIfNeeded before this method.
-        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+        if (pollTimeout > retryBackoffMs) {

Review Comment:
   > Secondly, I guess we should avoid using the 
subscription.hasAllFetchPositions here in the app thread because the assignment 
it iterates could change in the background.
   
   `hasAllFetchPositions()`, like nearly all of the `public` methods in 
`SubscriptionState`, is `synchronized`. Or is there a different sort of race 
condition you're worried about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to