AndrewJSchofield commented on code in PR #19010:
URL: https://github.com/apache/kafka/pull/19010#discussion_r1967282787


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6190,6 +6198,289 @@ public void 
testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)),
+            new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT 
*/)),
+            new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)),
+            new ShareAcknowledgementBatch(15, 16, List.of((byte) 2 /* RELEASE 
*/))));
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L));
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+
+        // Check cached state.
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
+    }
+
+    /**
+     * Test the case where the available cached batches never appear again in 
fetch response within the
+     * previous fetch offset range. Also remove records from the previous 
fetch batches.
+     * <p>
+     * Such case can arise with compacted topics where complete batches are 
removed or records within
+     * batches are removed.
+     */
+    @Test
+    public void 
testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Create 3 batches of records for a single acquire.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire batch (0-34) which shall create single cache entry.
+        fetchAcquiredRecords(sharePartition, records, 35);
+        // Acquire another 3 individual batches of records.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15);
+        // Release all batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 4 entries.
+        assertEquals(4, sharePartition.cachedState().size());
+
+        // Compact all batches and remove some of the batches from the fetch 
response.
+        buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 0, 2)) {
+            // Append only 2 records for 0 offset batch starting from offset 1.
+            memoryRecords(2, 1).records().forEach(builder::append);
+        }
+        // Do not include batch from offset 5. And compact batch starting at 
offset 20.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 20, 2)) {
+            // Append 2 records for 20 offset batch starting from offset 20.
+            memoryRecords(2, 20).records().forEach(builder::append);
+            // And append 2 records matching the end offset of the batch.
+            memoryRecords(2, 33).records().forEach(builder::append);
+        }
+        // Send the full batch at offset 40.
+        memoryRecordsBuilder(buffer, 5, 40).close();
+        // Do not include batch from offset 45. And compact the batch at 
offset 50.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 50, 2)) {
+            // Append 5 records for 50 offset batch starting from offset 51.
+            memoryRecords(5, 51).records().forEach(builder::append);
+            // Append 2 records for in middle of the batch.
+            memoryRecords(2, 58).records().forEach(builder::append);
+            // And append 1 record prior to the end offset.
+            memoryRecords(1, 63).records().forEach(builder::append);
+        }
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        // Acquire the new compacted batches. The acquire method determines 
the acquisition range using
+        // the first and last offsets of the fetched batches and acquires all 
available cached batches
+        // within that range. That means the batch from offset 45-49 which is 
not included in the
+        // fetch response will also be acquired. Similarly, for the batch from 
offset 5-19 which is
+        // anyway in the bigger cached batch of 0-34, will also be acquired. 
This avoids iterating
+        // through individual fetched batch boundaries; the client is 
responsible for reporting any
+        // data gaps via acknowledgements. This test also covers the edge case 
where the last fetched
+        // batch is compacted, and its last offset is before the previously 
cached version's last offset.
+        // In this situation, the last batch's offset state tracking is 
initialized. This is handled
+        // correctly because the client will send individual offset 
acknowledgements, which require offset
+        // state tracking anyway. While this last scenario is unlikely in 
practice (as a batch's reported
+        // last offset should remain correct even after compaction), the test 
verifies its proper handling.
+        fetchAcquiredRecords(sharePartition, records, 59);
+        assertEquals(64, sharePartition.nextFetchOffset());
+    }
+
+    /**
+     * This test verifies that cached batches which are no longer returned in 
fetch responses (starting
+     * from the fetchOffset) are correctly archived. Archiving these batches 
is crucial for the SPSO
+     * and the next fetch offset to advance. Without archiving, these offsets 
would be stuck, as the
+     * cached batches would remain available.
+     * <p>
+     * This scenario can occur with compacted topics when entire batches, 
previously held in the cache,
+     * are removed from the log at the offset where reading occurs.
+     */
+    @Test
+    public void testAcquireWhenBatchesRemovedForFetchOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15);
+        // Release the batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 3 entries.
+        assertEquals(3, sharePartition.cachedState().size());
+
+        // Compact second batch and remove first batch from the fetch response.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append only 4 records for 5th offset batch starting from offset 
6.
+            memoryRecords(4, 6).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Only second batch should be acquired and first batch offsets should 
be archived. Send
+        // fetchOffset as 0.
+        fetchAcquiredRecords(sharePartition, records, 0, 0, 5);
+        assertEquals(10, sharePartition.nextFetchOffset());
+        // Though the next fetch offset is moved but start offset should 
remain the same as we acquire
+        // just marks the offsets archived. The start offset shall move 
correctly once any records are acknowledged.
+        assertEquals(0, sharePartition.startOffset());
+
+        // Even invoking the release API shall update the cache and move the 
start offset.

Review Comment:
   I think this is better as "Releasing acquired records updates the cache and 
moves the start offset."



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6190,6 +6198,289 @@ public void 
testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)),
+            new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT 
*/)),
+            new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)),
+            new ShareAcknowledgementBatch(15, 16, List.of((byte) 2 /* RELEASE 
*/))));
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L));
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+
+        // Check cached state.
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
+    }
+
+    /**
+     * Test the case where the available cached batches never appear again in 
fetch response within the
+     * previous fetch offset range. Also remove records from the previous 
fetch batches.
+     * <p>
+     * Such case can arise with compacted topics where complete batches are 
removed or records within
+     * batches are removed.
+     */
+    @Test
+    public void 
testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Create 3 batches of records for a single acquire.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire batch (0-34) which shall create single cache entry.
+        fetchAcquiredRecords(sharePartition, records, 35);
+        // Acquire another 3 individual batches of records.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15);
+        // Release all batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 4 entries.
+        assertEquals(4, sharePartition.cachedState().size());
+
+        // Compact all batches and remove some of the batches from the fetch 
response.
+        buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 0, 2)) {
+            // Append only 2 records for 0 offset batch starting from offset 1.
+            memoryRecords(2, 1).records().forEach(builder::append);
+        }
+        // Do not include batch from offset 5. And compact batch starting at 
offset 20.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 20, 2)) {
+            // Append 2 records for 20 offset batch starting from offset 20.
+            memoryRecords(2, 20).records().forEach(builder::append);
+            // And append 2 records matching the end offset of the batch.
+            memoryRecords(2, 33).records().forEach(builder::append);
+        }
+        // Send the full batch at offset 40.
+        memoryRecordsBuilder(buffer, 5, 40).close();
+        // Do not include batch from offset 45. And compact the batch at 
offset 50.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 50, 2)) {
+            // Append 5 records for 50 offset batch starting from offset 51.
+            memoryRecords(5, 51).records().forEach(builder::append);
+            // Append 2 records for in middle of the batch.
+            memoryRecords(2, 58).records().forEach(builder::append);
+            // And append 1 record prior to the end offset.
+            memoryRecords(1, 63).records().forEach(builder::append);
+        }
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        // Acquire the new compacted batches. The acquire method determines 
the acquisition range using
+        // the first and last offsets of the fetched batches and acquires all 
available cached batches
+        // within that range. That means the batch from offset 45-49 which is 
not included in the
+        // fetch response will also be acquired. Similarly, for the batch from 
offset 5-19 which is
+        // anyway in the bigger cached batch of 0-34, will also be acquired. 
This avoids iterating
+        // through individual fetched batch boundaries; the client is 
responsible for reporting any
+        // data gaps via acknowledgements. This test also covers the edge case 
where the last fetched
+        // batch is compacted, and its last offset is before the previously 
cached version's last offset.
+        // In this situation, the last batch's offset state tracking is 
initialized. This is handled
+        // correctly because the client will send individual offset 
acknowledgements, which require offset
+        // state tracking anyway. While this last scenario is unlikely in 
practice (as a batch's reported
+        // last offset should remain correct even after compaction), the test 
verifies its proper handling.
+        fetchAcquiredRecords(sharePartition, records, 59);
+        assertEquals(64, sharePartition.nextFetchOffset());
+    }
+
+    /**
+     * This test verifies that cached batches which are no longer returned in 
fetch responses (starting
+     * from the fetchOffset) are correctly archived. Archiving these batches 
is crucial for the SPSO
+     * and the next fetch offset to advance. Without archiving, these offsets 
would be stuck, as the
+     * cached batches would remain available.
+     * <p>
+     * This scenario can occur with compacted topics when entire batches, 
previously held in the cache,
+     * are removed from the log at the offset where reading occurs.
+     */
+    @Test
+    public void testAcquireWhenBatchesRemovedForFetchOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15);
+        // Release the batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 3 entries.
+        assertEquals(3, sharePartition.cachedState().size());
+
+        // Compact second batch and remove first batch from the fetch response.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append only 4 records for 5th offset batch starting from offset 
6.
+            memoryRecords(4, 6).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Only second batch should be acquired and first batch offsets should 
be archived. Send
+        // fetchOffset as 0.
+        fetchAcquiredRecords(sharePartition, records, 0, 0, 5);
+        assertEquals(10, sharePartition.nextFetchOffset());
+        // Though the next fetch offset is moved but start offset should 
remain the same as we acquire
+        // just marks the offsets archived. The start offset shall move 
correctly once any records are acknowledged.
+        assertEquals(0, sharePartition.startOffset());
+
+        // Even invoking the release API shall update the cache and move the 
start offset.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+        // Validate first batch has been removed from the cache.
+        assertEquals(2, sharePartition.cachedState().size());
+        sharePartition.cachedState().forEach((offset, inFlightState) -> {
+            assertNotNull(inFlightState.batchState());
+            assertEquals(RecordState.AVAILABLE, inFlightState.batchState());
+        });
+    }
+
+    /**
+     * This test verifies that cached batches which are no longer returned in 
fetch responses are
+     * correctly archived, when fetchOffset is within an already cached batch. 
Archiving these batches/offsets
+     * is crucial for the SPSO and the next fetch offset to advance.
+     * <p>
+     * This scenario can occur with compacted topics when fetch triggers from 
an offset which is within
+     * a cached batch, and respective batch is removed from the log.
+     */
+    @Test
+    public void testAcquireWhenBatchesRemovedForFetchOffsetWithinBatch() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15);
+        // Acknowledge subset of the first batch offsets.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            // Accept the 3 offsets of first batch.
+            new ShareAcknowledgementBatch(5, 7, List.of((byte) 1)))).join();
+        // Release the remaining batches/offsets in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID).join();
+        // Validate cache has 2 entries.
+        assertEquals(2, sharePartition.cachedState().size());
+
+        // Mark fetch offset within the first batch to 8, first available 
offset.
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 8, 0, 15);
+        assertEquals(25, sharePartition.nextFetchOffset());
+        // Though the next fetch offset is moved but start offset should 
remain the same as we acquire

Review Comment:
   Same comment here.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6190,6 +6198,289 @@ public void 
testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)),
+            new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT 
*/)),
+            new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)),
+            new ShareAcknowledgementBatch(15, 16, List.of((byte) 2 /* RELEASE 
*/))));
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L));
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+
+        // Check cached state.
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
+    }
+
+    /**
+     * Test the case where the available cached batches never appear again in 
fetch response within the
+     * previous fetch offset range. Also remove records from the previous 
fetch batches.
+     * <p>
+     * Such case can arise with compacted topics where complete batches are 
removed or records within
+     * batches are removed.
+     */
+    @Test
+    public void 
testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Create 3 batches of records for a single acquire.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire batch (0-34) which shall create single cache entry.
+        fetchAcquiredRecords(sharePartition, records, 35);
+        // Acquire another 3 individual batches of records.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15);
+        // Release all batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 4 entries.
+        assertEquals(4, sharePartition.cachedState().size());
+
+        // Compact all batches and remove some of the batches from the fetch 
response.
+        buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 0, 2)) {
+            // Append only 2 records for 0 offset batch starting from offset 1.
+            memoryRecords(2, 1).records().forEach(builder::append);
+        }
+        // Do not include batch from offset 5. And compact batch starting at 
offset 20.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 20, 2)) {
+            // Append 2 records for 20 offset batch starting from offset 20.
+            memoryRecords(2, 20).records().forEach(builder::append);
+            // And append 2 records matching the end offset of the batch.
+            memoryRecords(2, 33).records().forEach(builder::append);
+        }
+        // Send the full batch at offset 40.
+        memoryRecordsBuilder(buffer, 5, 40).close();
+        // Do not include batch from offset 45. And compact the batch at 
offset 50.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 50, 2)) {
+            // Append 5 records for 50 offset batch starting from offset 51.
+            memoryRecords(5, 51).records().forEach(builder::append);
+            // Append 2 records for in middle of the batch.
+            memoryRecords(2, 58).records().forEach(builder::append);
+            // And append 1 record prior to the end offset.
+            memoryRecords(1, 63).records().forEach(builder::append);
+        }
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        // Acquire the new compacted batches. The acquire method determines 
the acquisition range using
+        // the first and last offsets of the fetched batches and acquires all 
available cached batches
+        // within that range. That means the batch from offset 45-49 which is 
not included in the
+        // fetch response will also be acquired. Similarly, for the batch from 
offset 5-19 which is
+        // anyway in the bigger cached batch of 0-34, will also be acquired. 
This avoids iterating
+        // through individual fetched batch boundaries; the client is 
responsible for reporting any
+        // data gaps via acknowledgements. This test also covers the edge case 
where the last fetched
+        // batch is compacted, and its last offset is before the previously 
cached version's last offset.
+        // In this situation, the last batch's offset state tracking is 
initialized. This is handled
+        // correctly because the client will send individual offset 
acknowledgements, which require offset
+        // state tracking anyway. While this last scenario is unlikely in 
practice (as a batch's reported
+        // last offset should remain correct even after compaction), the test 
verifies its proper handling.
+        fetchAcquiredRecords(sharePartition, records, 59);
+        assertEquals(64, sharePartition.nextFetchOffset());
+    }
+
+    /**
+     * This test verifies that cached batches which are no longer returned in 
fetch responses (starting
+     * from the fetchOffset) are correctly archived. Archiving these batches 
is crucial for the SPSO
+     * and the next fetch offset to advance. Without archiving, these offsets 
would be stuck, as the
+     * cached batches would remain available.
+     * <p>
+     * This scenario can occur with compacted topics when entire batches, 
previously held in the cache,
+     * are removed from the log at the offset where reading occurs.
+     */
+    @Test
+    public void testAcquireWhenBatchesRemovedForFetchOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15);
+        // Release the batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 3 entries.
+        assertEquals(3, sharePartition.cachedState().size());
+
+        // Compact second batch and remove first batch from the fetch response.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append only 4 records for 5th offset batch starting from offset 
6.
+            memoryRecords(4, 6).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Only second batch should be acquired and first batch offsets should 
be archived. Send
+        // fetchOffset as 0.
+        fetchAcquiredRecords(sharePartition, records, 0, 0, 5);
+        assertEquals(10, sharePartition.nextFetchOffset());
+        // Though the next fetch offset is moved but start offset should 
remain the same as we acquire

Review Comment:
   I don't quite understand this comment. I think the grammar needs a tweak.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1061,33 +1098,103 @@ void updateCacheAndOffsets(long logStartOffset) {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long mapFetchOffset = fetchOffset;
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(fetchOffset);

Review Comment:
   And this is `floorEntry`.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1061,33 +1098,103 @@ void updateCacheAndOffsets(long logStartOffset) {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long mapFetchOffset = fetchOffset;

Review Comment:
   I think this is `floorOffset` logically, that is, the offset of the cache 
entry which contains the fetch offset.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1061,33 +1098,103 @@ void updateCacheAndOffsets(long logStartOffset) {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long mapFetchOffset = fetchOffset;
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(fetchOffset);
+            if (floorOffset != null && floorOffset.getValue().lastOffset() >= 
fetchOffset) {
+                mapFetchOffset = floorOffset.getKey();
+            }
+
+            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(mapFetchOffset, true, baseOffset, false);
+            if (subMap.isEmpty()) {
+                // No stale batches to archive.
+                return;
+            }
+            // Though such batches can be removed from the cache, but it is 
better to archive them so

Review Comment:
   nit: I'd add a blank line here, and above the next comment too to improve 
the spacing of the code for readability. This is a tricky area so anything that 
helps to make it easier to follow is helpful.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6190,6 +6198,289 @@ public void 
testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)),
+            new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT 
*/)),
+            new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)),

Review Comment:
   I think we probably ought to define constants for these 0,1,2,3 and use them 
across all components.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6190,6 +6198,289 @@ public void 
testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)),
+            new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT 
*/)),
+            new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)),
+            new ShareAcknowledgementBatch(15, 16, List.of((byte) 2 /* RELEASE 
*/))));
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L));
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+
+        // Check cached state.
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
+    }
+
+    /**
+     * Test the case where the available cached batches never appear again in 
fetch response within the
+     * previous fetch offset range. Also remove records from the previous 
fetch batches.
+     * <p>
+     * Such case can arise with compacted topics where complete batches are 
removed or records within
+     * batches are removed.
+     */
+    @Test
+    public void 
testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Create 3 batches of records for a single acquire.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire batch (0-34) which shall create single cache entry.
+        fetchAcquiredRecords(sharePartition, records, 35);
+        // Acquire another 3 individual batches of records.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15);
+        // Release all batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 4 entries.
+        assertEquals(4, sharePartition.cachedState().size());
+
+        // Compact all batches and remove some of the batches from the fetch 
response.
+        buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 0, 2)) {
+            // Append only 2 records for 0 offset batch starting from offset 1.
+            memoryRecords(2, 1).records().forEach(builder::append);
+        }
+        // Do not include batch from offset 5. And compact batch starting at 
offset 20.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 20, 2)) {
+            // Append 2 records for 20 offset batch starting from offset 20.
+            memoryRecords(2, 20).records().forEach(builder::append);
+            // And append 2 records matching the end offset of the batch.
+            memoryRecords(2, 33).records().forEach(builder::append);
+        }
+        // Send the full batch at offset 40.
+        memoryRecordsBuilder(buffer, 5, 40).close();
+        // Do not include batch from offset 45. And compact the batch at 
offset 50.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 50, 2)) {
+            // Append 5 records for 50 offset batch starting from offset 51.
+            memoryRecords(5, 51).records().forEach(builder::append);
+            // Append 2 records for in middle of the batch.
+            memoryRecords(2, 58).records().forEach(builder::append);
+            // And append 1 record prior to the end offset.
+            memoryRecords(1, 63).records().forEach(builder::append);
+        }
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        // Acquire the new compacted batches. The acquire method determines 
the acquisition range using
+        // the first and last offsets of the fetched batches and acquires all 
available cached batches
+        // within that range. That means the batch from offset 45-49 which is 
not included in the
+        // fetch response will also be acquired. Similarly, for the batch from 
offset 5-19 which is
+        // anyway in the bigger cached batch of 0-34, will also be acquired. 
This avoids iterating
+        // through individual fetched batch boundaries; the client is 
responsible for reporting any
+        // data gaps via acknowledgements. This test also covers the edge case 
where the last fetched
+        // batch is compacted, and its last offset is before the previously 
cached version's last offset.
+        // In this situation, the last batch's offset state tracking is 
initialized. This is handled
+        // correctly because the client will send individual offset 
acknowledgements, which require offset
+        // state tracking anyway. While this last scenario is unlikely in 
practice (as a batch's reported
+        // last offset should remain correct even after compaction), the test 
verifies its proper handling.
+        fetchAcquiredRecords(sharePartition, records, 59);
+        assertEquals(64, sharePartition.nextFetchOffset());

Review Comment:
   I think it's worth asserting the cached state here too.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6190,6 +6198,289 @@ public void 
testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, List.of((byte) 0 /* GAP */)),
+            new ShareAcknowledgementBatch(10, 11, List.of((byte) 1 /* ACCEPT 
*/)),
+            new ShareAcknowledgementBatch(12, 14, List.of((byte) 3 /* GAP */)),

Review Comment:
   3 != GAP.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -651,13 +682,19 @@ public ShareAcquiredRecords acquire(
         lock.writeLock().lock();
         try {
             long baseOffset = firstBatch.baseOffset();
+            // There might be cached batches which are stale due to topic 
compaction hence archive them.

Review Comment:
   nit: Add some blank lines please to space this tricky code out.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -651,13 +682,19 @@ public ShareAcquiredRecords acquire(
         lock.writeLock().lock();
         try {
             long baseOffset = firstBatch.baseOffset();
+            // There might be cached batches which are stale due to topic 
compaction hence archive them.
+            maybeArchiveStaleBatches(fetchOffset, baseOffset);
             // Find the floor batch record for the request batch. The request 
batch could be
             // for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
             // of 12-13. Hence, floor entry is fetched to find the sub-map.
             Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(baseOffset);

Review Comment:
   I think I'd call this `floorEntry` because you need to get the key to obtain 
the offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to