apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636610291


##########
core/src/main/java/kafka/server/SharePartition.java:
##########
@@ -257,11 +333,110 @@ public CompletableFuture<List<AcquiredRecords>> acquire(
         FetchPartitionData fetchPartitionData
     ) {
         log.trace("Received acquire request for share partition: {}-{}", 
memberId, fetchPartitionData);
+        RecordBatch lastBatch = 
fetchPartitionData.records.lastBatch().orElse(null);
+        if (lastBatch == null) {
+            // Nothing to acquire.
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
 
-        CompletableFuture<List<AcquiredRecords>> future = new 
CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException("Not 
implemented"));
+        // We require the first batch of records to get the base offset. Stop 
parsing further
+        // batches.
+        RecordBatch firstBatch = 
fetchPartitionData.records.batches().iterator().next();
+        lock.writeLock().lock();
+        try {
+            long baseOffset = firstBatch.baseOffset();
+            // Find the floor batch record for the request batch. The request 
batch could be
+            // for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
+            // of 12-13. Hence, floor entry is fetched to find the sub-map.
+            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(baseOffset);
+            // We might find a batch with floor entry but not necessarily that 
batch has an overlap,
+            // if the request batch base offset is ahead of last offset from 
floor entry i.e. cached
+            // batch of 10-14 and request batch of 15-18, though floor entry 
is found but no overlap.
+            if (floorOffset != null && floorOffset.getValue().lastOffset >= 
baseOffset) {
+                baseOffset = floorOffset.getKey();
+            }
+            // Validate if the fetch records are already part of existing 
batches and if available.
+            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+            // No overlap with request offsets in the cache for in-flight 
records. Acquire the complete
+            // batch.
+            if (subMap.isEmpty()) {
+                log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
+                    groupId, topicIdPartition);
+                return 
CompletableFuture.completedFuture(Collections.singletonList(
+                    acquireNewBatchRecords(memberId, firstBatch.baseOffset(), 
lastBatch.lastOffset())));
+            }
 
-        return future;
+            log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
+                + " the share group: {}-{}", groupId, topicIdPartition);
+            List<AcquiredRecords> result = new ArrayList<>();
+            // The fetched records are already part of the in-flight records. 
The records might
+            // be available for re-delivery hence try acquiring same. The 
request batches could
+            // be an exact match, subset or span over multiple already fetched 
batches.
+            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
+                InFlightBatch inFlightBatch = entry.getValue();
+                // Compute if the batch is a full match.
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastBatch.lastOffset());
+
+                if (!fullMatch || inFlightBatch.offsetState != null) {
+                    log.trace("Subset or offset tracked batch record found for 
share partition,"
+                            + " batch: {} request offsets - first: {}, last: 
{} for the share"
+                            + " group: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
+                        lastBatch.lastOffset(), groupId, topicIdPartition);
+                    if (inFlightBatch.offsetState == null) {
+                        // Though the request is a subset of in-flight batch 
but the offset
+                        // tracking has not been initialized yet which means 
that we could only
+                        // acquire subset of offsets from the in-flight batch 
but only if the
+                        // complete batch is available yet. Hence, do a 
pre-check to avoid exploding
+                        // the in-flight offset tracking unnecessarily.
+                        if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
+                            log.trace("The batch is not available to acquire 
in share group: {}-{}, skipping: {}"
+                                    + " skipping offset tracking for batch as 
well.", groupId,
+                                topicIdPartition, inFlightBatch);
+                            continue;
+                        }
+                        // The request batch is a subset or per offset state 
is managed hence update
+                        // the offsets state in the in-flight batch.
+                        inFlightBatch.maybeInitializeOffsetStateUpdate();
+                    }
+                    acquireSubsetBatchRecords(memberId, 
firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result);
+                    continue;
+                }
+
+                // The in-flight batch is a full match hence change the state 
of the complete.
+                if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
+                    log.trace("The batch is not available to acquire in share 
group: {}-{}, skipping: {}",
+                        groupId, topicIdPartition, inFlightBatch);
+                    continue;
+                }
+
+                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount, 
memberId);
+                if (updateResult == null) {
+                    log.info("Unable to acquire records for the batch: {} in 
share group: {}-{}",
+                        inFlightBatch, groupId, topicIdPartition);
+                    continue;
+                }
+                // Schedule acquisition lock timeout for the batch.
+                AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset());
+                // Set the acquisition lock timeout task for the batch.
+                
inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask);
+
+                result.add(new AcquiredRecords()
+                    .setFirstOffset(inFlightBatch.firstOffset())
+                    .setLastOffset(inFlightBatch.lastOffset())
+                    .setDeliveryCount((short) 
inFlightBatch.batchDeliveryCount()));
+            }
+
+            // Some of the request offsets are not found in the fetched 
batches. Acquire the
+            // missing records as well.
+            if (subMap.lastEntry().getValue().lastOffset < 
lastBatch.lastOffset()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to