DL1231 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2648789397
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1914,29 @@ private int acquireSubsetBatchRecords(
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // If the record is on last delivery attempt then isolate that
record to be delivered alone.
+ // If the respective record is corrupt then it prevents
increasing delivery count of multiple
+ // records in a single response batch. Condition below checks
if the current record has reached
+ // the delivery limit and already have some records to return
in response then skip processing
+ // the current record, which shall be delivered alone in next
fetch.
+ if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ break;
Review Comment:
Thanks for the suggestion. You're right, that's a valid concern. Adding a
WARN log will make it clear to users that this is an intentional behavior,
which will help prevent any potential alarm.
Additionally, I think it's a good idea to introduce a configuration flag to
enable or disable this back-off mechanism. This would give users the
flexibility to choose between:
- Prioritizing throughput (by disabling the feature) for scenarios where
timeliness is critical.
- Prioritizing robustness (by keeping it enabled) to safely handle and skip
corrupted records.
We can discuss the new configuration option further if you think it's a good
addition.
Thanks again for the feedback
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]