DL1231 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2510224636
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,6 +1899,11 @@ private int acquireSubsetBatchRecords(
continue;
}
+ // If the record has any pending deliveries, return
immediately and do not deliver the current bad record.
+ if (offsetState.getValue().deliveryCount() >=
BAD_RECORD_DELIVERY_THRESHOLD && (hasBeenAcquired > 0 || acquiredCount > 0)) {
+ return -acquiredCount;
+ }
+
Review Comment:
### Example Scenario
**Record offsets and delivery counts:**
```
Offset: 10, 11, 12, 13, 14, 15
Delivery Count: 1, 2, 3, 2, 2, 1
```
(Assuming `BAD_RECORD_DELIVERY_THRESHOLD = 3` - offset 12 is a bad record)
### Before Changes:
- **Case 1:** `fetchStartOffset = 10`, `maxFetchRecords = 6`
- Returns records from offset 10-15 (including the bad record at offset 12)
- **Case 2:** `fetchStartOffset = 12`, `maxFetchRecords = 6`
- Returns records from offset 12-15 (starting with the bad record)
### After Changes:
- **Case 1:** `fetchStartOffset = 10`, `maxFetchRecords = 6`
- Returns records from offset 10-11 only (stops at the first bad record)
- **Case 2:** `fetchStartOffset = 12`, `maxFetchRecords = 6`
- Returns only offset 12 (the bad record itself, since there are no
preceding good records)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]