kirktrue commented on PR #20324:
URL: https://github.com/apache/kafka/pull/20324#issuecomment-3571752925

   > Another gap I noticed, not when collecting from the buffer, but when 
sending pre-fetches (so it's not directly with the changes of this PR, but 
related).
   > 
   > Here we're ensuring we collect data from the buffer only when there is no 
need to validate positions (or when positions were validated). Following a 
similar principle, I expect we should ensure pre-fetches are only built when we 
have updated the partition state (metadata changed? needs validation?). The 
classic consumer ensures that here :
   > 
   > 
https://github.com/apache/kafka/blob/dc9ae8bf805a93896104a51417318729e032fc3e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L674-L675
   > 
   > But I don't see how we are ensuring that on the async? we send pre-fetches 
by directly triggering a `CreateFetchRequestsEvent`, which will generate fetch 
requests without `validatePositionsOnMetadataChange`. The gap seems clear when 
we look at the usages of the `fetchRequestManager.createFetchRequests()`
   > 
   > * from AsyncPollEvent -> here it's ok, we only createFetchRequests after 
having updated positions
   > * from process(CreateFetchRequestsEvent) -> this is the gap I'm seeing, 
triggered for pre-fetching (could we include partitions that may need 
validation but since we didnt; call validatePositionsOnMetadataChange we don't 
notice?)
   > 
   > Am I missing something or is it indeed a gap?
   
   Yes, it's possible a `CreateFetchRequestsEvent` may be _enqueued_ before the 
processing for the `AsyncPollEvent` has completed, but the background thread 
doesn't start _processing_ the `CreateFetchRequestsEvent` immediately. The 
background processing of `AsyncPollEvent` executes without pause until after 
the `validatePositionsIfNeeded()` method returns. So the background thread 
won't _process_ the `CreateFetchRequestsEvent` until after the `AsyncPollEvent` 
processing (including `validatePositionsIfNeeded()`) has completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to