apoorvmittal10 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2546730167


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1866,6 +1888,8 @@ private int acquireSubsetBatchRecords(
     ) {
         lock.writeLock().lock();
         int acquiredCount = 0;
+        long maxFetchRecordsWhileBadRecord = -1;

Review Comment:
   ```suggestion
           long maxFetchRecordsWhileThrottledRecords = -1;
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1909,27 @@ private int acquireSubsetBatchRecords(
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // On last delivery attempt, submit acquired records,
+                // bad record will be delivered alone next time
+                if (maxDeliveryCount > 2 && recordDeliveryCount == 
maxDeliveryCount - 1 && acquiredCount > 0) {
+                    hasBadRecord = true;
+                    break;
+                }
+
+                // When record delivery count reach the throttle threshold, 
progressively reduce batch size to isolate bad records.

Review Comment:
   I won't prefer to use `bad` as we should just talk about the records which 
have approached the delivery limit and we send individual records.
   ```suggestion
                   // When record delivery count reach the throttle threshold, 
progressively reduce batch size to isolate records.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1909,27 @@ private int acquireSubsetBatchRecords(
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // On last delivery attempt, submit acquired records,
+                // bad record will be delivered alone next time

Review Comment:
   ```suggestion
                   // If the record is on last delivery attempt then isolate 
that record to be delivered alone.
                   // If the respective record is corrupt then it prevents 
increasing delivery count of multiple
                   // records in a single response batch. Condition below 
checks if the current record has reached
                   // the delivery limit and already have some records to 
return in response then skip processing
                   // the current record, which shall be delivered alone in 
next fetch.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +872,16 @@ public ShareAcquiredRecords acquire(
                     // In record_limit mode, we need to ensure that we do not 
acquire more than
                     // maxRecordsToAcquire. Hence, pass the remaining number 
of records that can
                     // be acquired.
-                    int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
-                    acquiredCount += acquiredSubsetCount;
+                    BadRecordMarkerAndAcquiredCount 
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId, 
isRecordLimitMode,
+                        numRecordsRemaining, firstBatch.baseOffset(), 
lastOffsetToAcquire, inFlightBatch, result);
+
+                    acquiredCount += 
badRecordMarkerAndAcquiredCount.acquiredCount();
+                    // If a bad record is present, return immediately and set 
`maxRecordsToAcquire = 0`
+                    // to prevent acquiring any new records afterwards.

Review Comment:
   ```suggestion
                       // If records are throttled, return immediately and set 
`maxRecordsToAcquire = 0`
                       // to prevent acquiring any new records afterwards.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +871,16 @@ public ShareAcquiredRecords acquire(
                     // In record_limit mode, we need to ensure that we do not 
acquire more than
                     // maxRecordsToAcquire. Hence, pass the remaining number 
of records that can
                     // be acquired.
-                    int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
-                    acquiredCount += acquiredSubsetCount;
+                    BadRecordMarkerAndAcquiredCount 
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId, 
isRecordLimitMode,
+                        numRecordsRemaining, firstBatch.baseOffset(), 
lastOffsetToAcquire, inFlightBatch, result);
+
+                    acquiredCount += 
badRecordMarkerAndAcquiredCount.acquiredCount();
+                    // If a bad record is present, return immediately and set 
`maxRecordsToAcquire = -1`
+                    // to prevent acquiring any new records afterwards.
+                    if (badRecordMarkerAndAcquiredCount.badRecordMarker()) {

Review Comment:
   Correct but in the example presented ideally we should not return true from 
`shouldThrottleRecordsDelivery`. So if we change our condition in method  
`shouldThrottleRecordsDelivery` to below then we can avoid same:
   
   ```
   inFlightBatch.offsetState().values().stream().filter(offsetState -> 
offsetState.state() == 
RecordState.AVAILABLE).mapToInt(InFlightState::deliveryCount).max().orElse(0) 
>= throttleRecordsDeliveryLimit;



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // On last delivery attempt, submit acquired records,
+                // bad record will be delivered alone next time
+                if (maxDeliveryCount > 2 && recordDeliveryCount == 
maxDeliveryCount - 1 && acquiredCount > 0) {
+                    hasBadRecord = true;
+                    break;
+                }

Review Comment:
   Though you are right with the isolation and it will be in line with the 
checks you have for batches, where also we are checking twice, before and 
after. However for offsets this should not be the case, ideally, unless some 
offsets are specifically released. Having said that, I think let's keep the way 
you have implemented to isolate single records on approaching the delivery 
limits.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1866,6 +1888,8 @@ private int acquireSubsetBatchRecords(
     ) {
         lock.writeLock().lock();
         int acquiredCount = 0;
+        long maxFetchRecordsWhileBadRecord = -1;
+        boolean hasBadRecord = false;

Review Comment:
   ```suggestion
           boolean hasThrottledRecord = false;
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -362,6 +367,7 @@ enum SharePartitionState {
         this.leaderEpoch = leaderEpoch;
         this.maxInFlightRecords = maxInFlightRecords;
         this.maxDeliveryCount = maxDeliveryCount;
+        this.throttleRecordsDeliveryLimit = Math.max(2, (int) 
Math.ceil((double) maxDeliveryCount / 2));

Review Comment:
   Define this as a constant and use elsewhere 
`MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT`.
   ```suggestion
           this.throttleRecordsDeliveryLimit = 
Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) Math.ceil((double) 
maxDeliveryCount / 2));
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,15 @@ public ShareAcquiredRecords acquire(
                 boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastOffsetToAcquire);
                 int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
                 boolean recordLimitSubsetMatch = isRecordLimitMode && 
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, 
acquiredCount);
-                if (!fullMatch || inFlightBatch.offsetState() != null || 
recordLimitSubsetMatch) {
+                boolean throttleRecordsDelivery = 
shouldThrottleRecordsDelivery(inFlightBatch);
+                // Stop acquiring more records if records delivery has to be 
throttled. The throttling prevents
+                // complete batch to be archived in case of a single record 
being corrupt.

Review Comment:
   ```suggestion
                   // Stop acquiring more records if records delivery has to be 
throttled. The throttling prevents
                   // complete batch to be archived in case of a single record 
being corrupt.
                   // Below check isolates the current batch/offsets to be 
delivered individually in subsequent fetches.
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void 
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
         Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testSkipBadRecordWhenPendingDeliveriesExist() {

Review Comment:
   ```suggestion
       public void testThrottleRecordsWhenPendingDeliveriesExist() {
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1116,9 +1116,9 @@ public void testMaybeInitializeAndAcquire() {
             new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
                 PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
                     List.of(
-                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 0),

Review Comment:
   Why to change it from `2`? I understand changing below 30L offset from 3 
else that would have triggered the throttling.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void 
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
         Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testSkipBadRecordWhenPendingDeliveriesExist() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 7, 13).close();
+        memoryRecordsBuilder(buffer, 20, 8).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
offset 26 of the fetched batch.
+        // 16 records should be returned: 7-19, 23-25
+        // The record at offset 26 has a delivery count of 3 and is a bad 
record; it should be skipped.

Review Comment:
   ```suggestion
           // The record at offset 26 has a delivery count of 3 and is a 
subject to be throttled; it should be skipped.
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void 
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
         Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testSkipBadRecordWhenPendingDeliveriesExist() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 7, 13).close();
+        memoryRecordsBuilder(buffer, 20, 8).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
offset 26 of the fetched batch.
+        // 16 records should be returned: 7-19, 23-25
+        // The record at offset 26 has a delivery count of 3 and is a bad 
record; it should be skipped.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            16);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(26, sharePartition.nextFetchOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertEquals(3, sharePartition.deliveryCompleteCount());
+    }
+
+    @Test
+    public void 
testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt() 
throws InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 34L, 
RecordState.AVAILABLE.id, (short) 4)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withMaxDeliveryCount(7)
+            
.withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS)
+            .build();
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(34, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // The record at offset 15 has a delivery count of 4 and is a bad 
record
+        // First acquisition attempt fails: batch size should be halved (20 -> 
10)
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
+
+        // Second failure: batch size halved again (now ~1/4 of original, 20 
-> 5)
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
+
+        // Final delivery attempt: only the suspected bad record should be 
acquired
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            1);
+    }

Review Comment:
   We do not have any tests for the scenarios mentioned here: 
https://github.com/apache/kafka/pull/20837/files#r2544029799
   and https://github.com/apache/kafka/pull/20837/files#r2543997891



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void 
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
         Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testSkipBadRecordWhenPendingDeliveriesExist() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 7, 13).close();
+        memoryRecordsBuilder(buffer, 20, 8).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
offset 26 of the fetched batch.
+        // 16 records should be returned: 7-19, 23-25
+        // The record at offset 26 has a delivery count of 3 and is a bad 
record; it should be skipped.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            16);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(26, sharePartition.nextFetchOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertEquals(3, sharePartition.deliveryCompleteCount());

Review Comment:
   Can we refetch with the same buffer by calling `fetchAcquiredRecords` and 
check both offset 26,27 will be delivered?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1239,9 +1239,9 @@ public void 
testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() {
             new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
                 PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
                     List.of(
-                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 0),

Review Comment:
   Same as above.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void 
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
         Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testSkipBadRecordWhenPendingDeliveriesExist() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 7, 13).close();
+        memoryRecordsBuilder(buffer, 20, 8).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
offset 26 of the fetched batch.
+        // 16 records should be returned: 7-19, 23-25
+        // The record at offset 26 has a delivery count of 3 and is a bad 
record; it should be skipped.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            16);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(26, sharePartition.nextFetchOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertEquals(3, sharePartition.deliveryCompleteCount());
+    }
+
+    @Test
+    public void 
testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt() 
throws InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 34L, 
RecordState.AVAILABLE.id, (short) 4)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withMaxDeliveryCount(7)
+            
.withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS)
+            .build();
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(34, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // The record at offset 15 has a delivery count of 4 and is a bad 
record
+        // First acquisition attempt fails: batch size should be halved (20 -> 
10)
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
+
+        // Second failure: batch size halved again (now ~1/4 of original, 20 
-> 5)
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
+
+        // Final delivery attempt: only the suspected bad record should be 
acquired
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            1);

Review Comment:
   Can we call this in a loop of 10 to verify all next set of records are 
delivered individually. Verify the acquired offset returned as well. And later 
verify the other 10 remaining ones are delivered correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to