majialoong commented on PR #21057:
URL: https://github.com/apache/kafka/pull/21057#issuecomment-3603017424

   I added logging to verify the race condition.
   
   In `MonitorableSinkTask.put()`:
   ```
   @Override
   public void put(Collection<SinkRecord> records) {
       super.put(records);
       count += records.size();
       // add log
       System.out.println("MonitorableSinkTask: count updated to: " + count);
   }
   ```
   
   In `MonitorableSinkIntegrationTest` (before fix):
   ```
   // add log
   log.info("Task metric value: {}", metrics.get(taskMetric).metricValue());
   assertEquals((double) NUM_RECORDS_PRODUCED, 
metrics.get(taskMetric).metricValue());
   ```
   
   Successful run:
   <img width="2878" height="614" alt="image" 
src="https://github.com/user-attachments/assets/3d04e96c-bd8c-41eb-a8af-910fcc39db17";
 />
   
   Failed run:
   <img width="2880" height="794" alt="image" 
src="https://github.com/user-attachments/assets/add9c69c-6797-412d-97ac-6eeb6871b94f";
 />
   
   The logs clearly show the race condition:
   1. In the failed run, `awaitRecords()` returns (latch countdown completed) 
before the last `count += records.size()` executes.
   2. The test thread reads the metric value 999.0 while the task thread has 
not yet updated count to 1000.
   3. The `count updated to: 1000` log appears after the test reads the metric, 
confirming the race.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to