junrao commented on code in PR #18804:
URL: https://github.com/apache/kafka/pull/18804#discussion_r1945434837


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager, 
TopicPartition tp) {
         }
         return partition;
     }
+
+    /**
+     * Slice the fetch records based on the acquired records. The slicing is 
done based on the first
+     * and last offset of the acquired records from the list. The slicing 
doesn't consider individual
+     * acquired batches rather the boundaries of the acquired list.
+     *
+     * @param records The records to be sliced.
+     * @param shareAcquiredRecords The share acquired records containing the 
non-empty acquired records.
+     * @return The sliced records, if the records are of type FileRecords and 
the acquired records are a subset
+     *        of the fetched records. Otherwise, the original records are 
returned.
+     */
+    static Records maybeSliceFetchRecords(Records records, 
ShareAcquiredRecords shareAcquiredRecords) {
+        if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof 
FileRecords fileRecords)) {
+            return records;
+        }
+        // The acquired records should be non-empty, do not check as the 
method is called only when the
+        // acquired records are non-empty.
+        List<AcquiredRecords> acquiredRecords = 
shareAcquiredRecords.acquiredRecords();
+        try {
+            final long firstAcquiredOffset = 
acquiredRecords.get(0).firstOffset();
+            final long lastAcquiredOffset = 
acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
+            int startPosition = 0;
+            int size = 0;
+            // Track the previous batch to adjust the start position in case 
the first acquired offset
+            // is between the batch.
+            FileChannelRecordBatch previousBatch = null;
+            for (FileChannelRecordBatch batch : fileRecords.batches()) {
+                // If the batch base offset is less than the first acquired 
offset, then the start position
+                // should be updated to skip the batch.
+                if (batch.baseOffset() < firstAcquiredOffset) {

Review Comment:
   Hmm, not sure why we need to maintain `previousBatch` below. Could set just 
set `startPosition` when `batch.lastOffset()` is >=  `firstAcquiredOffset` for 
the first time?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -678,6 +680,9 @@ public ShareAcquiredRecords acquire(
             for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
                 // If the acquired count is equal to the max fetch records 
then break the loop.
                 if (acquiredCount >= maxFetchRecords) {
+                    // If the limit to acquire records is reached then it 
means there exists additional
+                    // fetch batches which cannot be acquired.
+                    subsetAcquired = true;

Review Comment:
   It's a bit messy to maintain this. Do we really need this since we can find 
this out by iterating the batches, which is relatively cheap?



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager, 
TopicPartition tp) {
         }
         return partition;
     }
+
+    /**
+     * Slice the fetch records based on the acquired records. The slicing is 
done based on the first
+     * and last offset of the acquired records from the list. The slicing 
doesn't consider individual
+     * acquired batches rather the boundaries of the acquired list.
+     *
+     * @param records The records to be sliced.
+     * @param shareAcquiredRecords The share acquired records containing the 
non-empty acquired records.
+     * @return The sliced records, if the records are of type FileRecords and 
the acquired records are a subset
+     *        of the fetched records. Otherwise, the original records are 
returned.
+     */
+    static Records maybeSliceFetchRecords(Records records, 
ShareAcquiredRecords shareAcquiredRecords) {
+        if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof 
FileRecords fileRecords)) {

Review Comment:
   Hmm, with remote storage, it's possible for records to be of MemoryRecords. 
It would be useful to slice it too.



##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -449,4 +468,202 @@ tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 
0L, 0L,
         Mockito.verify(exceptionHandler, times(1)).accept(new 
SharePartitionKey("grp", tp0), exception);
         Mockito.verify(sp0, times(0)).updateCacheAndOffsets(any(Long.class));
     }
+
+    @Test
+    public void testMaybeSliceFetchRecordsSingleBatch() throws IOException {
+        // Create 1 batch of records with 10 records.
+        FileRecords records = createFileRecords(Map.of(5L, 10));
+
+        // Acquire all offsets, should return all batches.
+        List<AcquiredRecords> acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(5).setLastOffset(14).setDeliveryCount((short) 
1));
+        Records slicedRecords = 
ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 10, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire offsets out of first offset bound should return the records 
for the matching batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(2).setLastOffset(14).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 10, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire offsets out of last offset bound should return the records 
for the matching batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(5).setLastOffset(20).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only subset of batch offsets, starting from the first 
offset.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only subset of batch offsets, ending at the last offset.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(8).setLastOffset(14).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only subset of batch offsets, in between the batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(8).setLastOffset(10).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire subsets out of base offset bounds should return empty 
records.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(2).setLastOffset(4).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(0, slicedRecords.sizeInBytes());
+
+        // Acquire subsets out of last offset bounds should return empty 
records.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(15).setLastOffset(18).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(0, slicedRecords.sizeInBytes());
+    }
+
+    @Test
+    public void testMaybeSliceFetchRecordsMultipleBatches() throws IOException 
{
+        // Create 3 batches of records with 3, 2 and 4 records respectively.
+        LinkedHashMap<Long, Integer> offsetValues = new LinkedHashMap<>();
+        offsetValues.put(0L, 3);
+        offsetValues.put(3L, 2);
+        offsetValues.put(7L, 4); // Gap of 2 offsets between batches.
+        FileRecords records = createFileRecords(offsetValues);
+
+        // Acquire all offsets, should return all batches.
+        List<AcquiredRecords> acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(0).setLastOffset(10).setDeliveryCount((short) 
1));
+        Records slicedRecords = 
ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 11, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire offsets from all batches, but only first record from last 
batch. Should return
+        // all batches.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only first batch offsets, should return only first batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        List<RecordBatch> recordBatches = 
TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(0, recordBatches.get(0).baseOffset());
+        assertEquals(2, recordBatches.get(0).lastOffset());
+
+        // Acquire only second batch offsets, should return only second batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(3, recordBatches.get(0).baseOffset());
+        assertEquals(4, recordBatches.get(0).lastOffset());
+
+        // Acquire only last batch offsets, should return only last batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(7).setLastOffset(10).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(7, recordBatches.get(0).baseOffset());
+        assertEquals(10, recordBatches.get(0).lastOffset());
+
+        // Acquire only subset of first batch offsets, should return only 
first batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(0, recordBatches.get(0).baseOffset());
+        assertEquals(2, recordBatches.get(0).lastOffset());
+
+        // Acquire only subset of second batch offsets, should return only 
second batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(3, recordBatches.get(0).baseOffset());
+        assertEquals(4, recordBatches.get(0).lastOffset());
+
+        // Acquire only subset of last batch offsets, should return only last 
batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(7, recordBatches.get(0).baseOffset());
+        assertEquals(10, recordBatches.get(0).lastOffset());
+
+        // Acquire including gaps between batches, should return 2 batches.

Review Comment:
   Hmm, there are no gaps btw batches, right?



##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -369,49 +379,58 @@ public void testProcessFetchResponseWithMaxFetchRecords() 
{
         ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
memberId.toString(),
             new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10, 
BROKER_TOPIC_STATS);
 
-        MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE,
-            new SimpleRecord("0".getBytes(), "v".getBytes()),
-            new SimpleRecord("1".getBytes(), "v".getBytes()),
-            new SimpleRecord("2".getBytes(), "v".getBytes()),
-            new SimpleRecord(null, "value".getBytes()));
+        LinkedHashMap<Long, Integer> offsetValues = new LinkedHashMap<>();
+        offsetValues.put(0L, 1);
+        offsetValues.put(1L, 1);
+        offsetValues.put(2L, 1);
+        offsetValues.put(3L, 1);
+        Records records1 = createFileRecords(offsetValues);
+
+        offsetValues.clear();
+        offsetValues.put(100L, 4);
+        Records records2 = createFileRecords(offsetValues);
 
         FetchPartitionData fetchPartitionData1 = new 
FetchPartitionData(Errors.NONE, 0L, 0L,
             records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
             OptionalInt.empty(), false);
         FetchPartitionData fetchPartitionData2 = new 
FetchPartitionData(Errors.NONE, 0L, 0L,
-            records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
+            records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
             OptionalInt.empty(), false);
 
         when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 
fetchPartitionData1)).thenReturn(
-            ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
-                .setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 
1)));
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 
1), true));
         when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 
fetchPartitionData2)).thenReturn(
-            ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
-                
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
+                
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1), true));
 
         // Send the topic partitions in order so can validate if correct mock 
is called, accounting
         // the offset count for the acquired records from the previous share 
partition acquire.
         Map<TopicIdPartition, FetchPartitionData> responseData1 = new 
LinkedHashMap<>();
         responseData1.put(tp0, fetchPartitionData1);
         responseData1.put(tp1, fetchPartitionData2);
 
-        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData1 =
+        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=

Review Comment:
   responseData1 => responseData ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to