apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636612042


##########
core/src/main/java/kafka/server/SharePartition.java:
##########
@@ -257,11 +333,110 @@ public CompletableFuture<List<AcquiredRecords>> acquire(
         FetchPartitionData fetchPartitionData
     ) {
         log.trace("Received acquire request for share partition: {}-{}", 
memberId, fetchPartitionData);
+        RecordBatch lastBatch = 
fetchPartitionData.records.lastBatch().orElse(null);
+        if (lastBatch == null) {
+            // Nothing to acquire.
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
 
-        CompletableFuture<List<AcquiredRecords>> future = new 
CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException("Not 
implemented"));
+        // We require the first batch of records to get the base offset. Stop 
parsing further
+        // batches.
+        RecordBatch firstBatch = 
fetchPartitionData.records.batches().iterator().next();
+        lock.writeLock().lock();
+        try {
+            long baseOffset = firstBatch.baseOffset();
+            // Find the floor batch record for the request batch. The request 
batch could be
+            // for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
+            // of 12-13. Hence, floor entry is fetched to find the sub-map.
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(baseOffset);
+            // We might find a batch with floor entry but not necessarily that 
batch has an overlap,
+            // if the request batch base offset is ahead of last offset from 
floor entry i.e. cached
+            // batch of 10-14 and request batch of 15-18, though floor entry 
is found but no overlap.
+            if (floorOffset != null && floorOffset.getValue().lastOffset >= 
baseOffset) {
+                baseOffset = floorOffset.getKey();
+            }
+            // Validate if the fetch records are already part of existing 
batches and if available.
+            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+            // No overlap with request offsets in the cache for in-flight 
records. Acquire the complete
+            // batch.
+            if (subMap.isEmpty()) {
+                log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
+                    groupId, topicIdPartition);
+                return 
CompletableFuture.completedFuture(Collections.singletonList(
+                    acquireNewBatchRecords(memberId, firstBatch.baseOffset(), 
lastBatch.lastOffset())));
+            }
 
-        return future;
+            log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
+                + " the share group: {}-{}", groupId, topicIdPartition);
+            List<AcquiredRecords> result = new ArrayList<>();
+            // The fetched records are already part of the in-flight records. 
The records might
+            // be available for re-delivery hence try acquiring same. The 
request batches could
+            // be an exact match, subset or span over multiple already fetched 
batches.
+            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
+                InFlightBatch inFlightBatch = entry.getValue();
+                // Compute if the batch is a full match.
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastBatch.lastOffset());
+
+                if (!fullMatch || inFlightBatch.offsetState != null) {
+                    log.trace("Subset or offset tracked batch record found for 
share partition,"
+                            + " batch: {} request offsets - first: {}, last: 
{} for the share"
+                            + " group: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
+                        lastBatch.lastOffset(), groupId, topicIdPartition);
+                    if (inFlightBatch.offsetState == null) {
+                        // Though the request is a subset of in-flight batch 
but the offset
+                        // tracking has not been initialized yet which means 
that we could only
+                        // acquire subset of offsets from the in-flight batch 
but only if the
+                        // complete batch is available yet. Hence, do a 
pre-check to avoid exploding
+                        // the in-flight offset tracking unnecessarily.
+                        if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
+                            log.trace("The batch is not available to acquire 
in share group: {}-{}, skipping: {}"
+                                    + " skipping offset tracking for batch as 
well.", groupId,
+                                topicIdPartition, inFlightBatch);
+                            continue;
+                        }
+                        // The request batch is a subset or per offset state 
is managed hence update
+                        // the offsets state in the in-flight batch.
+                        inFlightBatch.maybeInitializeOffsetStateUpdate();
+                    }
+                    acquireSubsetBatchRecords(memberId, 
firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result);
+                    continue;
+                }
+
+                // The in-flight batch is a full match hence change the state 
of the complete.

Review Comment:
   Missed it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to