clolov commented on code in PR #18804:
URL: https://github.com/apache/kafka/pull/18804#discussion_r1944774713


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager, 
TopicPartition tp) {
         }
         return partition;
     }
+
+    /**
+     * Slice the fetch records based on the acquired records. The slicing is 
done based on the first
+     * and last offset of the acquired records from the list. The slicing 
doesn't consider individual
+     * acquired batches rather the boundaries of the acquired list.
+     *
+     * @param records The records to be sliced.
+     * @param shareAcquiredRecords The share acquired records containing the 
non-empty acquired records.
+     * @return The sliced records, if the records are of type FileRecords and 
the acquired records are a subset
+     *        of the fetched records. Otherwise, the original records are 
returned.
+     */
+    static Records maybeSliceFetchRecords(Records records, 
ShareAcquiredRecords shareAcquiredRecords) {
+        if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof 
FileRecords fileRecords)) {
+            return records;
+        }
+        // The acquired records should be non-empty, do not check as the 
method is called only when the
+        // acquired records are non-empty.
+        List<AcquiredRecords> acquiredRecords = 
shareAcquiredRecords.acquiredRecords();
+        try {
+            final long firstAcquiredOffset = 
acquiredRecords.get(0).firstOffset();
+            final long lastAcquiredOffset = 
acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
+            int startPosition = 0;
+            int size = 0;
+            // Track the previous batch to adjust the start position in case 
the first acquired offset
+            // is between the batch.
+            FileChannelRecordBatch previousBatch = null;
+            for (FileChannelRecordBatch batch : fileRecords.batches()) {
+                // If the batch base offset is less than the first acquired 
offset, then the start position
+                // should be updated to skip the batch.
+                if (batch.baseOffset() < firstAcquiredOffset) {
+                    startPosition += batch.sizeInBytes();
+                    previousBatch = batch;
+                    continue;
+                }
+                // If the first acquired offset is between the batch, then 
adjust the start position

Review Comment:
   ```suggestion
                   // If the first acquired offset is within the batch, then 
adjust the start position
   ```



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager, 
TopicPartition tp) {
         }
         return partition;
     }
+
+    /**
+     * Slice the fetch records based on the acquired records. The slicing is 
done based on the first
+     * and last offset of the acquired records from the list. The slicing 
doesn't consider individual
+     * acquired batches rather the boundaries of the acquired list.
+     *
+     * @param records The records to be sliced.
+     * @param shareAcquiredRecords The share acquired records containing the 
non-empty acquired records.
+     * @return The sliced records, if the records are of type FileRecords and 
the acquired records are a subset
+     *        of the fetched records. Otherwise, the original records are 
returned.
+     */
+    static Records maybeSliceFetchRecords(Records records, 
ShareAcquiredRecords shareAcquiredRecords) {
+        if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof 
FileRecords fileRecords)) {
+            return records;
+        }
+        // The acquired records should be non-empty, do not check as the 
method is called only when the
+        // acquired records are non-empty.
+        List<AcquiredRecords> acquiredRecords = 
shareAcquiredRecords.acquiredRecords();
+        try {
+            final long firstAcquiredOffset = 
acquiredRecords.get(0).firstOffset();
+            final long lastAcquiredOffset = 
acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
+            int startPosition = 0;
+            int size = 0;
+            // Track the previous batch to adjust the start position in case 
the first acquired offset
+            // is between the batch.

Review Comment:
   ```suggestion
               // is within the batch.
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1186,6 +1185,50 @@ public void 
testAcquireWithMultipleBatchesAndMaxFetchRecords() {
         assertNull(sharePartition.cachedState().get(10L).offsetState());
     }
 
+    @Test
+    public void testAcquireMultipleRecordsWithOverlapAndMaxFetchRecords() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+        MemoryRecords records = memoryRecords(5, 0);
+        // Acquire 5 records.
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                2,
+                new FetchPartitionData(Errors.NONE, 20, 3, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        records = memoryRecords(5, 5);
+        // Acquire another 5 records.
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                2,
+                new FetchPartitionData(Errors.NONE, 20, 3, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+        // Release the acquired records so they can be re-acquired and max 
fetch records can be tested
+        // for overlapping records.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Add batches from 0-9 offsets and 10-12, 5-9 should be acquired and 
0-4 should be ignored.
+        // 10-12 should be ignored as it exceeds the max fetch records.

Review Comment:
   Could you go in a bit more detail here, because as far as I can see you are 
able to obtain records 0-4 as they have been released for redelivery - the 
assertion also starts comparing from offset 0, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to