apoorvmittal10 commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1636601366
########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -310,37 +485,264 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } + private AcquiredRecords acquireNewBatchRecords( + String memberId, + long firstOffset, + long lastOffset + ) { + lock.writeLock().lock(); + try { + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); + // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. + cachedState.put(firstOffset, new InFlightBatch( + memberId, + firstOffset, + lastOffset, + RecordState.ACQUIRED, + 1, + timerTask)); + // if the cachedState was empty before acquiring the new batches then startOffset needs to be updated + if (cachedState.firstKey() == firstOffset) { + startOffset = firstOffset; + } + endOffset = lastOffset; + return new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount((short) 1); + } finally { + lock.writeLock().unlock(); + } + } + + private void acquireSubsetBatchRecords( + String memberId, + long requestFirstOffset, + long requestLastOffset, + InFlightBatch inFlightBatch, + List<AcquiredRecords> result + ) { + lock.writeLock().lock(); + try { + for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < requestFirstOffset) { + continue; + } + + if (offsetState.getKey() > requestLastOffset) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.AVAILABLE) { + log.trace("The offset is not available skipping, offset: {} batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, + memberId); + if (updateResult == null) { + log.trace("Unable to acquire records for the offset: {} in batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the offset. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); + // Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + + // TODO: Maybe we can club the continuous offsets here. + result.add(new AcquiredRecords() + .setFirstOffset(offsetState.getKey()) + .setLastOffset(offsetState.getKey()) + .setDeliveryCount((short) offsetState.getValue().deliveryCount)); + } + } finally { + lock.writeLock().unlock(); + } + } + + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { + return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); + } + + // TODO: maxDeliveryCount should be utilized here once it is implemented /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Apply acquisition lock to acquired records. + * @param memberId The member id of the client that is putting the acquisition lock. + * @param firstOffset The first offset of the acquired records. + * @param lastOffset The last offset of the acquired records. */ Review Comment: Thanks, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org