apoorvmittal10 commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1636610291
########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -257,11 +333,110 @@ public CompletableFuture<List<AcquiredRecords>> acquire( FetchPartitionData fetchPartitionData ) { log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData); + RecordBatch lastBatch = fetchPartitionData.records.lastBatch().orElse(null); + if (lastBatch == null) { + // Nothing to acquire. + return CompletableFuture.completedFuture(Collections.emptyList()); + } - CompletableFuture<List<AcquiredRecords>> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + // We require the first batch of records to get the base offset. Stop parsing further + // batches. + RecordBatch firstBatch = fetchPartitionData.records.batches().iterator().next(); + lock.writeLock().lock(); + try { + long baseOffset = firstBatch.baseOffset(); + // Find the floor batch record for the request batch. The request batch could be + // for a subset of the in-flight batch i.e. cached batch of offset 10-14 and request batch + // of 12-13. Hence, floor entry is fetched to find the sub-map. + Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(baseOffset); + // We might find a batch with floor entry but not necessarily that batch has an overlap, + // if the request batch base offset is ahead of last offset from floor entry i.e. cached + // batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap. + if (floorOffset != null && floorOffset.getValue().lastOffset >= baseOffset) { + baseOffset = floorOffset.getKey(); + } + // Validate if the fetch records are already part of existing batches and if available. + NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true); + // No overlap with request offsets in the cache for in-flight records. Acquire the complete + // batch. + if (subMap.isEmpty()) { + log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", + groupId, topicIdPartition); + return CompletableFuture.completedFuture(Collections.singletonList( + acquireNewBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset()))); + } - return future; + log.trace("Overlap exists with in-flight records. Acquire the records if available for" + + " the share group: {}-{}", groupId, topicIdPartition); + List<AcquiredRecords> result = new ArrayList<>(); + // The fetched records are already part of the in-flight records. The records might + // be available for re-delivery hence try acquiring same. The request batches could + // be an exact match, subset or span over multiple already fetched batches. + for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); + // Compute if the batch is a full match. + boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset()); + + if (!fullMatch || inFlightBatch.offsetState != null) { + log.trace("Subset or offset tracked batch record found for share partition," + + " batch: {} request offsets - first: {}, last: {} for the share" + + " group: {}-{}", inFlightBatch, firstBatch.baseOffset(), + lastBatch.lastOffset(), groupId, topicIdPartition); + if (inFlightBatch.offsetState == null) { + // Though the request is a subset of in-flight batch but the offset + // tracking has not been initialized yet which means that we could only + // acquire subset of offsets from the in-flight batch but only if the + // complete batch is available yet. Hence, do a pre-check to avoid exploding + // the in-flight offset tracking unnecessarily. + if (inFlightBatch.batchState() != RecordState.AVAILABLE) { + log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}" + + " skipping offset tracking for batch as well.", groupId, + topicIdPartition, inFlightBatch); + continue; + } + // The request batch is a subset or per offset state is managed hence update + // the offsets state in the in-flight batch. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + } + acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result); + continue; + } + + // The in-flight batch is a full match hence change the state of the complete. + if (inFlightBatch.batchState() != RecordState.AVAILABLE) { + log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}", + groupId, topicIdPartition, inFlightBatch); + continue; + } + + InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount, memberId); + if (updateResult == null) { + log.info("Unable to acquire records for the batch: {} in share group: {}-{}", + inFlightBatch, groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(), inFlightBatch.lastOffset()); + // Set the acquisition lock timeout task for the batch. + inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask); + + result.add(new AcquiredRecords() + .setFirstOffset(inFlightBatch.firstOffset()) + .setLastOffset(inFlightBatch.lastOffset()) + .setDeliveryCount((short) inFlightBatch.batchDeliveryCount())); + } + + // Some of the request offsets are not found in the fetched batches. Acquire the + // missing records as well. + if (subMap.lastEntry().getValue().lastOffset < lastBatch.lastOffset()) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org