apoorvmittal10 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2542693260


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,15 @@ public ShareAcquiredRecords acquire(
                 boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastOffsetToAcquire);
                 int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
                 boolean recordLimitSubsetMatch = isRecordLimitMode && 
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, 
acquiredCount);
-                if (!fullMatch || inFlightBatch.offsetState() != null || 
recordLimitSubsetMatch) {
+                boolean throttleRecordsDelivery = 
shouldThrottleRecordsDelivery(inFlightBatch);
+                // Stop acquiring more records if bad record found after 
acquiring some data to
+                // prevent affecting already acquired records

Review Comment:
   ```suggestion
                   // Stop acquiring more records if records delivery has to be 
throttled. The throttling prevents
                   // complete batch to be archived in case of a single record 
being corrupt.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // On last delivery attempt, submit acquired records,
+                // bad record will be delivered alone next time
+                if (maxDeliveryCount > 2 && recordDeliveryCount == 
maxDeliveryCount - 1 && acquiredCount > 0) {
+                    hasBadRecord = true;
+                    break;
+                }

Review Comment:
   So both the checks are same, or am I mis-reading it? In which scenario the 
above condition can execute?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +871,16 @@ public ShareAcquiredRecords acquire(
                     // In record_limit mode, we need to ensure that we do not 
acquire more than
                     // maxRecordsToAcquire. Hence, pass the remaining number 
of records that can
                     // be acquired.
-                    int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
-                    acquiredCount += acquiredSubsetCount;
+                    BadRecordMarkerAndAcquiredCount 
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId, 
isRecordLimitMode,
+                        numRecordsRemaining, firstBatch.baseOffset(), 
lastOffsetToAcquire, inFlightBatch, result);
+
+                    acquiredCount += 
badRecordMarkerAndAcquiredCount.acquiredCount();
+                    // If a bad record is present, return immediately and set 
`maxRecordsToAcquire = -1`
+                    // to prevent acquiring any new records afterwards.
+                    if (badRecordMarkerAndAcquiredCount.badRecordMarker()) {

Review Comment:
   Ok, in that case you can check if acquiredCount is > 0 from 
`acquireSubsetBatchRecords`, that will indicate of some records have been 
acquired from the batch which needs throttling, isn't it?
   
   I am just trying to simplify things here so the code is more readable and 
maintainable, let me know your thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to