apoorvmittal10 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2504579510
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -150,6 +150,12 @@ enum SharePartitionState {
AcknowledgeType.REJECT.id, RecordState.ARCHIVED
);
+ /**
+ * Records whose delivery count exceeds this are deemed abnormal,
+ * and the batching of these records should be reduced.
+ */
+ private static final int BAD_RECORD_DELIVERY_THRESHOLD = 2;
Review Comment:
May be 3 as default.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,6 +1899,11 @@ private int acquireSubsetBatchRecords(
continue;
}
+ // If the record has any pending deliveries, return
immediately and do not deliver the current bad record.
+ if (offsetState.getValue().deliveryCount() >=
BAD_RECORD_DELIVERY_THRESHOLD && (hasBeenAcquired > 0 || acquiredCount > 0)) {
+ return -acquiredCount;
+ }
+
Review Comment:
Sorry I didn't understand what we are verifying here?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,7 @@ public ShareAcquiredRecords acquire(
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
boolean recordLimitSubsetMatch = isRecordLimitMode &&
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire,
acquiredCount);
- if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch) {
+ if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch || inFlightBatch.batchDeliveryCount() >= 2) {
Review Comment:
We have defined the constant above but have used directly `2` here.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,7 @@ public ShareAcquiredRecords acquire(
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
boolean recordLimitSubsetMatch = isRecordLimitMode &&
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire,
acquiredCount);
- if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch) {
+ if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch || inFlightBatch.batchDeliveryCount() >= 2) {
Review Comment:
Also can you help explain the reasoning here for the condition added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]