apoorvmittal10 commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1637992948
########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -310,126 +485,361 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } + private AcquiredRecords acquireNewBatchRecords( + String memberId, + long firstOffset, + long lastOffset + ) { + lock.writeLock().lock(); + try { + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); + // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. + cachedState.put(firstOffset, new InFlightBatch( + memberId, + firstOffset, + lastOffset, + RecordState.ACQUIRED, + 1, + timerTask)); + // if the cachedState was empty before acquiring the new batches then startOffset needs to be updated + if (cachedState.firstKey() == firstOffset) { + startOffset = firstOffset; + } + endOffset = lastOffset; + return new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount((short) 1); + } finally { + lock.writeLock().unlock(); + } + } + + private void acquireSubsetBatchRecords( + String memberId, + long requestFirstOffset, + long requestLastOffset, + InFlightBatch inFlightBatch, + List<AcquiredRecords> result + ) { + lock.writeLock().lock(); + try { + for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < requestFirstOffset) { + continue; + } + + if (offsetState.getKey() > requestLastOffset) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.AVAILABLE) { + log.trace("The offset is not available skipping, offset: {} batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, + memberId); + if (updateResult == null) { + log.trace("Unable to acquire records for the offset: {} in batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the offset. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); + // Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + + // TODO: Maybe we can club the continuous offsets here. + result.add(new AcquiredRecords() + .setFirstOffset(offsetState.getKey()) + .setLastOffset(offsetState.getKey()) + .setDeliveryCount((short) offsetState.getValue().deliveryCount)); + } + } finally { + lock.writeLock().unlock(); + } + } + /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Check if the in-flight batch is a full match with the request offsets. The full match represents + * complete overlap of the in-flight batch with the request offsets. + * + * @param inFlightBatch The in-flight batch to check for full match. + * @param firstOffsetToCompare The first offset of the request batch. + * @param lastOffsetToCompare The last offset of the request batch. + * + * @return True if the in-flight batch is a full match with the request offsets, false otherwise. */ - private static class InFlightBatch { - /** - * The offset of the first record in the batch that is fetched from the log. - */ + private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetToCompare, long lastOffsetToCompare) { + return inFlightBatch.firstOffset() >= firstOffsetToCompare && inFlightBatch.lastOffset() <= lastOffsetToCompare; + } + + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { + return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); + } + + // TODO: maxDeliveryCount should be utilized here once it is implemented Review Comment: Done, I left it just beacuse there might be some other handling required. Removed it now. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -54,6 +60,7 @@ public class SharePartition { * The state of the records determines if the records should be re-delivered, move the next fetch * offset, or be state persisted to disk. */ + // Visible for testing public enum RecordState { Review Comment: thanks, done. I agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org