lianetm commented on PR #15215:
URL: https://github.com/apache/kafka/pull/15215#issuecomment-1904663735

   Hey @dajac, I made the changes discussed above, for using an internal flag 
instead, only affecting the fetchable and initializing positions states. Couple 
of comments:
   
   1. Regarding the use of assignedPartitions here (instead of the 
addedPartitions). I initially went with the addedPartitions also, thinking that 
we could allow to continue fetching from the previously owned partitions, but 
then went with the assignedPartitions to keep how the legacy logic behaves (it 
effectively stops fetching from all partitions, given that the callback blocks 
the polling loop). Also the onPartitionsAssigned callback receives the full set 
of assignedPartitions as well, so we could expect the user potentially playing 
with positions for them as well.
   2. Regarding the initializing offsets part, the way the async consumer 
executes callbacks and update positions it all happens in the Application 
thread sequentially (execute callback first 
[here](https://github.com/apache/kafka/blob/62ce551826192ef6137bc3ce670277f79bd3dee2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1632),
 then updates positions). Still, the intention there is not very clear, as it 
is processing multiple background events, so I added the changes in the core 
subscription state, with the clear intention of not initializing offsets if the 
partition is involved in the callback, consistent with the way we prevent 
fetching.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to