majialoong commented on PR #21476:
URL: https://github.com/apache/kafka/pull/21476#issuecomment-3897965113

   To analyze this problem, I added logs in the following four places:
   
   1. `AbstractFetch#prepareFetchRequests`: This log can record the position 
used when built a fetch request.
   <img width="2060" height="944" alt="image" 
src="https://github.com/user-attachments/assets/6cffa76f-b8cf-47b9-842b-d0652074efda";
 />
   
   
   2. `CompletedFetch#drain`: This log can record the update time of 
isConsumed. 
   <img width="2026" height="856" alt="image" 
src="https://github.com/user-attachments/assets/37889bf6-e2b9-4967-8f41-b8b8e655f435";
 />
   
   
   3. `FetchCollector#fetchRecords`: This log can record the update time of the 
position.
   <img width="2144" height="736" alt="image" 
src="https://github.com/user-attachments/assets/cd0aebaa-a3ed-4637-823a-ddc1c76edce8";
 />
   
   
   4. `FetchCollector#handleInitializeSuccess`: This log indicates that the 
client did indeed receive a duplicate offset response.
   <img width="2000" height="818" alt="image" 
src="https://github.com/user-attachments/assets/c0839722-fae5-4f58-b398-55c695d7691d";
 />
   
   ---
   
   When a flaky condition occurs, the log output is as follows, showing that 
two requests were made for offset=2:
   <img width="2858" height="880" alt="image" 
src="https://github.com/user-attachments/assets/c44bc76d-8ac1-412b-9454-a5d874dde237";
 />
   
   The log output was out of order. The logs were reordered based on time, and 
the results are as follows:
   
   ```
   [RACE-DIAG] fetch request built for partition topicA-0 at offset 0 
thread=consumer_background_thread time=3109265203665
   
   [RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with 
nextFetchOffset=1 thread=Test worker time=3109276422998
   [RACE-DIAG] position updated for partition topicA-0 from offset 0 to offset 
1 thread=Test worker time=3109276560248
   [RACE-DIAG] fetch request built for partition topicA-0 at offset 1 
thread=consumer_background_thread time=3109277235165
   
   [RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with 
nextFetchOffset=2 thread=Test worker time=3109283355457
   [RACE-DIAG] position updated for partition topicA-0 from offset 1 to offset 
2 thread=Test worker time=3109283482540
   [RACE-DIAG] fetch request built for partition topicA-0 at offset 2 
thread=consumer_background_thread time=3109284318457
   
   [RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with 
nextFetchOffset=3 thread=Test worker time=3109297402665
   // ---> The Fetch request was built before the position update, so the 
offset is still 2.
   [RACE-DIAG] fetch request built for partition topicA-0 at offset 2 
thread=consumer_background_thread time=3109297466248
   [RACE-DIAG] position updated for partition topicA-0 from offset 2 to offset 
3 thread=Test worker time=3109297562290
   // ---> This indicates that the client received a data response with an 
incorrect offset.
   [RACE-DIAG] Discarding stale fetch response for partition topicA-0 
fetchOffset=2 currentPosition=FetchPosition{offset=3, offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:44561 (id: 1 rack: null 
isFenced: false)], epoch=1}} thread=Test worker time=3109339984332
   [RACE-DIAG] fetch request built for partition topicA-0 at offset 3 
thread=consumer_background_thread time=3109348331623
   
   [RACE-DIAG] drain() set isConsumed=true for partition topicA-0 with 
nextFetchOffset=4 thread=Test worker time=3109349705332
   [RACE-DIAG] position updated for partition topicA-0 from offset 3 to offset 
4 thread=Test worker time=3109349783498
   [RACE-DIAG] fetch request built for partition topicA-0 at offset 4 
thread=consumer_background_thread time=3109350251540 
   ```
   
   ---
   
   The logs indicate that the fetch request was built using the old position 
and was discarded on the client side.
   
   I ran the test method 500 times locally using the modified code from the 
current PR, and the flaky issue no longer occurred.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to