apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1931875911


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -633,11 +661,33 @@ public ShareAcquiredRecords acquire(
                     firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxFetchRecords);
             }
 
-            log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
-                + " the share partition: {}-{}", groupId, topicIdPartition);
-            List<AcquiredRecords> result = new ArrayList<>();
             // The acquired count is used to track the number of records 
acquired for the request.
             int acquiredCount = 0;
+            List<AcquiredRecords> result = new ArrayList<>();
+
+            // If baseOffset is less than the first key of submap, this means 
the fetch happened for a gap in the cachedState.
+            // Thus a new batch needs to be acquired for the gap.
+            if (baseOffset < subMap.firstKey()) {
+                // This is to check whether the fetched records are all part 
of the gap, or they overlap with the next
+                // inFlight batch in the cachedState
+                if (lastBatch.lastOffset() < (subMap.firstKey())) {
+                    // The entire request batch is part of the gap
+                    return acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                        firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxFetchRecords);
+                } else {
+                    result.add(new AcquiredRecords()
+                        .setFirstOffset(firstBatch.baseOffset())
+                        .setLastOffset(subMap.firstKey() - 1)
+                        .setDeliveryCount((short) 1));
+                    acquiredCount += (int) (subMap.firstKey() - 
firstBatch.baseOffset());
+                    acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                        firstBatch.baseOffset(), subMap.firstKey() - 1, 
batchSize, maxFetchRecords);
+                }
+            }

Review Comment:
   Surprising to see the code outside of the below for loop. Why do you want to 
consider only first gap, can't there be more gaps i.e. persister returned start 
offset 10 and acked batches as 15-20, 30-40. Subsequent fetch returned batches 
from 10-50, then how are we acquiring batch from 21-29? Do we have a test case?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -633,11 +661,33 @@ public ShareAcquiredRecords acquire(
                     firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxFetchRecords);
             }
 
-            log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
-                + " the share partition: {}-{}", groupId, topicIdPartition);
-            List<AcquiredRecords> result = new ArrayList<>();
             // The acquired count is used to track the number of records 
acquired for the request.
             int acquiredCount = 0;
+            List<AcquiredRecords> result = new ArrayList<>();
+
+            // If baseOffset is less than the first key of submap, this means 
the fetch happened for a gap in the cachedState.
+            // Thus a new batch needs to be acquired for the gap.

Review Comment:
   ```suggestion
               // Thus, a new batch needs to be acquired for the gap.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1758,6 +1837,11 @@ private void maybeUpdateCachedStateAndOffsets() {
                 return;
             }
 
+            // if there is an acquirable gap initially, then we should not 
move the startOffset
+            if (initialReadGapOffset != null && 
initialReadGapOffset.gapStartOffset() == startOffset) {
+                return;
+            }

Review Comment:
   Are we saying the starttOffset shouldn't be moved utill all gaps are 
fetched, why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to