DL1231 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2540379517
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +871,16 @@ public ShareAcquiredRecords acquire(
// In record_limit mode, we need to ensure that we do not
acquire more than
// maxRecordsToAcquire. Hence, pass the remaining number
of records that can
// be acquired.
- int acquiredSubsetCount =
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining,
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
- acquiredCount += acquiredSubsetCount;
+ BadRecordMarkerAndAcquiredCount
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId,
isRecordLimitMode,
+ numRecordsRemaining, firstBatch.baseOffset(),
lastOffsetToAcquire, inFlightBatch, result);
+
+ acquiredCount +=
badRecordMarkerAndAcquiredCount.acquiredCount();
+ // If a bad record is present, return immediately and set
`maxRecordsToAcquire = -1`
+ // to prevent acquiring any new records afterwards.
+ if (badRecordMarkerAndAcquiredCount.badRecordMarker()) {
Review Comment:
Because the above check merely validates the `deliveryCount` to determine
whether there are bad records in the batch. However, due to factors like offset
or state, the bad records might not actually be acquired in this acquisition
process. Therefore, this additional check is to confirm whether the bad records
have truly been acquired.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]