DL1231 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2510111224


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,7 +865,14 @@ public ShareAcquiredRecords acquire(
                     // In record_limit mode, we need to ensure that we do not 
acquire more than
                     // maxRecordsToAcquire. Hence, pass the remaining number 
of records that can
                     // be acquired.
-                    int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
+                    int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, acquiredCount, inFlightBatch, 
result);
+                    // If a bad record is present, return immediately and set 
`maxRecordsToAcquire = -1`
+                    // to prevent acquiring any new records afterwards.
+                    if (acquiredSubsetCount < 0) {
+                        maxRecordsToAcquire = -1;
+                        acquiredCount += acquiredSubsetCount == 
Integer.MIN_VALUE ? 0 : -1 * acquiredSubsetCount;

Review Comment:
   The negative return values serve as special flags to handle bad records 
while maintaining data delivery:
   
   **Example Scenarios:**
   
   1. **Normal case (no bad records):**
      - Returns `3` → acquired 3 records normally
      - Processing continues until `maxRecordsToAcquire` is reached
   2. **Bad record encountered after some good records:**
      - Acquired 3 good records, then hits a bad record
      - Returns `-3` → signals "deliver the 3 good records AND stop further 
acquisition due to bad record"
      - Outer code: `acquiredCount += -(-3) = +3`
   3. **First record is bad (edge case):**
      - First record is bad, zero records acquired
      - Returns `Integer.MIN_VALUE` → signals "no records to deliver AND stop 
due to bad record"
      - Outer code: `acquiredCount += 0` (no increment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to