apoorvmittal10 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2542693260
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,15 @@ public ShareAcquiredRecords acquire(
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
boolean recordLimitSubsetMatch = isRecordLimitMode &&
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire,
acquiredCount);
- if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch) {
+ boolean throttleRecordsDelivery =
shouldThrottleRecordsDelivery(inFlightBatch);
+ // Stop acquiring more records if bad record found after
acquiring some data to
+ // prevent affecting already acquired records
Review Comment:
```suggestion
// Stop acquiring more records if records delivery has to be
throttled. The throttling prevents
// complete batch to be archived in case of a single record
being corrupt.
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // On last delivery attempt, submit acquired records,
+ // bad record will be delivered alone next time
+ if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ hasBadRecord = true;
+ break;
+ }
Review Comment:
So both the checks are same, or am I mis-reading it? In which scenario the
above condition can execute?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +871,16 @@ public ShareAcquiredRecords acquire(
// In record_limit mode, we need to ensure that we do not
acquire more than
// maxRecordsToAcquire. Hence, pass the remaining number
of records that can
// be acquired.
- int acquiredSubsetCount =
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining,
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
- acquiredCount += acquiredSubsetCount;
+ BadRecordMarkerAndAcquiredCount
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId,
isRecordLimitMode,
+ numRecordsRemaining, firstBatch.baseOffset(),
lastOffsetToAcquire, inFlightBatch, result);
+
+ acquiredCount +=
badRecordMarkerAndAcquiredCount.acquiredCount();
+ // If a bad record is present, return immediately and set
`maxRecordsToAcquire = -1`
+ // to prevent acquiring any new records afterwards.
+ if (badRecordMarkerAndAcquiredCount.badRecordMarker()) {
Review Comment:
Ok, in that case you can check if acquiredCount is > 0 from
`acquireSubsetBatchRecords`, that will indicate of some records have been
acquired from the batch which needs throttling, isn't it?
I am just trying to simplify things here so the code is more readable and
maintainable, let me know your thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]