DL1231 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2540431451
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // On last delivery attempt, submit acquired records,
+ // bad record will be delivered alone next time
+ if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ hasBadRecord = true;
+ break;
+ }
Review Comment:
The former check (on lines 1912-1917) ensures that if a record about to be
acquired is on its last attempt and some records have already been acquired,
those records are returned immediately. This guarantees that the last-attempt
record will be delivered alone next time.
The latter check (on lines 1944-1949) verifies whether a record that has
already been acquired is on its last attempt. If so, it returns that record
immediately to ensure it is delivered individually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]