AndrewJSchofield commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1634746185
########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -350,87 +752,73 @@ public String toString() { * fetched from the leader. The state of the record is used to determine if the record should * be re-deliver or if it can be acknowledged or archived. */ - private static class InFlightState { - /** - * The state of the fetch batch records. - */ + static final class InFlightState { + + // The state of the fetch batch records. private RecordState state; - /** - * The number of times the records has been delivered to the client. - */ + // The number of times the records has been delivered to the client. private int deliveryCount; - /** - * The member id of the client that is fetching/acknowledging the record. - */ + // The member id of the client that is fetching/acknowledging the record. private String memberId; + // The timer task for the acquisition lock timeout. + private AcquisitionLockTimerTask acquisitionLockTimeoutTask; + InFlightState(RecordState state, int deliveryCount, String memberId) { + this(state, deliveryCount, memberId, null); + } + + InFlightState(RecordState state, int deliveryCount, String memberId, AcquisitionLockTimerTask acquisitionLockTimeoutTask) { this.state = state; this.deliveryCount = deliveryCount; this.memberId = memberId; + this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; } - @Override - public int hashCode() { - return Objects.hash(state, deliveryCount, memberId); + void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) { + this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; + void cancelAndClearAcquisitionLockTimeoutTask() { + acquisitionLockTimeoutTask.cancel(); + acquisitionLockTimeoutTask = null; + } + + /** + * Try to update the state of the records. The state of the records can only be updated if the + * new state is allowed to be transitioned from old state. The delivery count is not incremented + * if the state update is unsuccessful. + * + * @param newState The new state of the records. + * @param incrementDeliveryCount Whether to increment the delivery count. + * + * @return {@code InFlightState} if update succeeds, null otherwise. Returning state + * helps update chaining. + */ + private InFlightState tryUpdateState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) { + try { + if (newState == RecordState.AVAILABLE && deliveryCount >= maxDeliveryCount) { + newState = RecordState.ARCHIVED; + } + state = state.validateTransition(newState); + if (incrementDeliveryCount && newState != RecordState.ARCHIVED) { + deliveryCount++; + } + memberId = newMemberId; + return this; + } catch (IllegalStateException e) { + log.info("Failed to update state of the records", e); Review Comment: I don't think log.info is appropriate here. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -310,37 +485,264 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } + private AcquiredRecords acquireNewBatchRecords( + String memberId, + long firstOffset, + long lastOffset + ) { + lock.writeLock().lock(); + try { + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); + // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. + cachedState.put(firstOffset, new InFlightBatch( + memberId, + firstOffset, + lastOffset, + RecordState.ACQUIRED, + 1, + timerTask)); + // if the cachedState was empty before acquiring the new batches then startOffset needs to be updated + if (cachedState.firstKey() == firstOffset) { + startOffset = firstOffset; + } + endOffset = lastOffset; + return new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount((short) 1); + } finally { + lock.writeLock().unlock(); + } + } + + private void acquireSubsetBatchRecords( + String memberId, + long requestFirstOffset, + long requestLastOffset, + InFlightBatch inFlightBatch, + List<AcquiredRecords> result + ) { + lock.writeLock().lock(); + try { + for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < requestFirstOffset) { + continue; + } + + if (offsetState.getKey() > requestLastOffset) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.AVAILABLE) { + log.trace("The offset is not available skipping, offset: {} batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, + memberId); + if (updateResult == null) { + log.trace("Unable to acquire records for the offset: {} in batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the offset. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); + // Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + + // TODO: Maybe we can club the continuous offsets here. + result.add(new AcquiredRecords() + .setFirstOffset(offsetState.getKey()) + .setLastOffset(offsetState.getKey()) + .setDeliveryCount((short) offsetState.getValue().deliveryCount)); + } + } finally { + lock.writeLock().unlock(); + } + } + + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { + return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); + } + + // TODO: maxDeliveryCount should be utilized here once it is implemented /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Apply acquisition lock to acquired records. + * @param memberId The member id of the client that is putting the acquisition lock. + * @param firstOffset The first offset of the acquired records. + * @param lastOffset The last offset of the acquired records. */ - private static class InFlightBatch { - /** - * The offset of the first record in the batch that is fetched from the log. - */ + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset, long delayMs) { + AcquisitionLockTimerTask acquistionLockTimerTask = acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs); + timer.add(acquistionLockTimerTask); + return acquistionLockTimerTask; + } + + private AcquisitionLockTimerTask acquisitionLockTimerTask(String memberId, long firstOffset, long lastOffset, long delayMs) { + return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset); + } + + private final class AcquisitionLockTimerTask extends TimerTask { + private final long expirationMs; + private final String memberId; private final long firstOffset; - /** - * The last offset of the batch that is fetched from the log. - */ private final long lastOffset; + + AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) { + super(delayMs); + this.expirationMs = time.hiResClockMs() + delayMs; + this.memberId = memberId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + + long expirationMs() { + return expirationMs; + } + /** - * The in-flight state of the fetched records. If the offset state map is empty then inflightState - * determines the state of the complete batch else individual offset determines the state of - * the respective records. + * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. */ + @Override + public void run() { + releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); + } + } + + private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) { + // TODO: Implement the logic to release the acquisition lock on timeout. + } + + private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetToCompare, long lastOffsetToCompare) { Review Comment: I suggest adding javadoc for this method. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -310,37 +485,264 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } + private AcquiredRecords acquireNewBatchRecords( + String memberId, + long firstOffset, + long lastOffset + ) { + lock.writeLock().lock(); + try { + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); + // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. + cachedState.put(firstOffset, new InFlightBatch( + memberId, + firstOffset, + lastOffset, + RecordState.ACQUIRED, + 1, + timerTask)); + // if the cachedState was empty before acquiring the new batches then startOffset needs to be updated + if (cachedState.firstKey() == firstOffset) { + startOffset = firstOffset; + } + endOffset = lastOffset; + return new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount((short) 1); + } finally { + lock.writeLock().unlock(); + } + } + + private void acquireSubsetBatchRecords( + String memberId, + long requestFirstOffset, + long requestLastOffset, + InFlightBatch inFlightBatch, + List<AcquiredRecords> result + ) { + lock.writeLock().lock(); + try { + for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < requestFirstOffset) { + continue; + } + + if (offsetState.getKey() > requestLastOffset) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.AVAILABLE) { + log.trace("The offset is not available skipping, offset: {} batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, + memberId); + if (updateResult == null) { + log.trace("Unable to acquire records for the offset: {} in batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the offset. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); + // Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + + // TODO: Maybe we can club the continuous offsets here. + result.add(new AcquiredRecords() + .setFirstOffset(offsetState.getKey()) + .setLastOffset(offsetState.getKey()) + .setDeliveryCount((short) offsetState.getValue().deliveryCount)); + } + } finally { + lock.writeLock().unlock(); + } + } + + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { + return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); + } + + // TODO: maxDeliveryCount should be utilized here once it is implemented /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Apply acquisition lock to acquired records. + * @param memberId The member id of the client that is putting the acquisition lock. + * @param firstOffset The first offset of the acquired records. + * @param lastOffset The last offset of the acquired records. */ - private static class InFlightBatch { - /** - * The offset of the first record in the batch that is fetched from the log. - */ + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset, long delayMs) { + AcquisitionLockTimerTask acquistionLockTimerTask = acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs); + timer.add(acquistionLockTimerTask); + return acquistionLockTimerTask; + } + + private AcquisitionLockTimerTask acquisitionLockTimerTask(String memberId, long firstOffset, long lastOffset, long delayMs) { + return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset); + } + + private final class AcquisitionLockTimerTask extends TimerTask { + private final long expirationMs; + private final String memberId; private final long firstOffset; - /** - * The last offset of the batch that is fetched from the log. - */ private final long lastOffset; + + AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) { + super(delayMs); + this.expirationMs = time.hiResClockMs() + delayMs; + this.memberId = memberId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + + long expirationMs() { + return expirationMs; + } + /** - * The in-flight state of the fetched records. If the offset state map is empty then inflightState - * determines the state of the complete batch else individual offset determines the state of - * the respective records. + * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. */ + @Override + public void run() { + releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); + } + } + + private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) { + // TODO: Implement the logic to release the acquisition lock on timeout. + } + + private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetToCompare, long lastOffsetToCompare) { + return inFlightBatch.firstOffset >= firstOffsetToCompare && inFlightBatch.lastOffset <= lastOffsetToCompare; + } + + // Visible for testing. Should only be used for testing purposes. + NavigableMap<Long, InFlightBatch> cachedState() { + return new ConcurrentSkipListMap<>(cachedState); + } + + /** + * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + */ + final class InFlightBatch { + // The offset of the first record in the batch that is fetched from the log. + private final long firstOffset; + // The last offset of the batch that is fetched from the log. + private final long lastOffset; + + // The in-flight state of the fetched records. If the offset state map is empty then inflightState + // determines the state of the complete batch else individual offset determines the state of + // the respective records. Review Comment: I wonder whether `inFlightState` would be better renamed to `batchState`. The point is that there are two ways of keeping track of the in-flight state - for the entire batch, or per-offset. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -257,11 +333,110 @@ public CompletableFuture<List<AcquiredRecords>> acquire( FetchPartitionData fetchPartitionData ) { log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData); + RecordBatch lastBatch = fetchPartitionData.records.lastBatch().orElse(null); + if (lastBatch == null) { + // Nothing to acquire. + return CompletableFuture.completedFuture(Collections.emptyList()); + } - CompletableFuture<List<AcquiredRecords>> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + // We require the first batch of records to get the base offset. Stop parsing further + // batches. + RecordBatch firstBatch = fetchPartitionData.records.batches().iterator().next(); + lock.writeLock().lock(); + try { + long baseOffset = firstBatch.baseOffset(); + // Find the floor batch record for the request batch. The request batch could be + // for a subset of the in-flight batch i.e. cached batch of offset 10-14 and request batch + // of 12-13. Hence, floor entry is fetched to find the sub-map. + Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(baseOffset); + // We might find a batch with floor entry but not necessarily that batch has an overlap, + // if the request batch base offset is ahead of last offset from floor entry i.e. cached + // batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap. + if (floorOffset != null && floorOffset.getValue().lastOffset >= baseOffset) { + baseOffset = floorOffset.getKey(); + } + // Validate if the fetch records are already part of existing batches and if available. + NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true); + // No overlap with request offsets in the cache for in-flight records. Acquire the complete + // batch. + if (subMap.isEmpty()) { + log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", + groupId, topicIdPartition); + return CompletableFuture.completedFuture(Collections.singletonList( + acquireNewBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset()))); + } - return future; + log.trace("Overlap exists with in-flight records. Acquire the records if available for" + + " the share group: {}-{}", groupId, topicIdPartition); + List<AcquiredRecords> result = new ArrayList<>(); + // The fetched records are already part of the in-flight records. The records might + // be available for re-delivery hence try acquiring same. The request batches could + // be an exact match, subset or span over multiple already fetched batches. + for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); + // Compute if the batch is a full match. + boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset()); + + if (!fullMatch || inFlightBatch.offsetState != null) { + log.trace("Subset or offset tracked batch record found for share partition," + + " batch: {} request offsets - first: {}, last: {} for the share" + + " group: {}-{}", inFlightBatch, firstBatch.baseOffset(), + lastBatch.lastOffset(), groupId, topicIdPartition); + if (inFlightBatch.offsetState == null) { + // Though the request is a subset of in-flight batch but the offset + // tracking has not been initialized yet which means that we could only + // acquire subset of offsets from the in-flight batch but only if the + // complete batch is available yet. Hence, do a pre-check to avoid exploding + // the in-flight offset tracking unnecessarily. + if (inFlightBatch.batchState() != RecordState.AVAILABLE) { + log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}" + + " skipping offset tracking for batch as well.", groupId, + topicIdPartition, inFlightBatch); + continue; + } + // The request batch is a subset or per offset state is managed hence update + // the offsets state in the in-flight batch. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + } + acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result); + continue; + } + + // The in-flight batch is a full match hence change the state of the complete. + if (inFlightBatch.batchState() != RecordState.AVAILABLE) { + log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}", + groupId, topicIdPartition, inFlightBatch); + continue; + } + + InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount, memberId); + if (updateResult == null) { + log.info("Unable to acquire records for the batch: {} in share group: {}-{}", + inFlightBatch, groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, inFlightBatch.firstOffset(), inFlightBatch.lastOffset()); + // Set the acquisition lock timeout task for the batch. + inFlightBatch.updateAcquisitionLockTimeout(acquisitionLockTimeoutTask); + + result.add(new AcquiredRecords() + .setFirstOffset(inFlightBatch.firstOffset()) + .setLastOffset(inFlightBatch.lastOffset()) + .setDeliveryCount((short) inFlightBatch.batchDeliveryCount())); + } + + // Some of the request offsets are not found in the fetched batches. Acquire the + // missing records as well. + if (subMap.lastEntry().getValue().lastOffset < lastBatch.lastOffset()) { Review Comment: nit: Inconsistent access of `...getValue().lastOffset` and `...getValue().lastOffset()` in this block of code. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -310,37 +485,264 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } + private AcquiredRecords acquireNewBatchRecords( + String memberId, + long firstOffset, + long lastOffset + ) { + lock.writeLock().lock(); + try { + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); + // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. + cachedState.put(firstOffset, new InFlightBatch( + memberId, + firstOffset, + lastOffset, + RecordState.ACQUIRED, + 1, + timerTask)); + // if the cachedState was empty before acquiring the new batches then startOffset needs to be updated + if (cachedState.firstKey() == firstOffset) { + startOffset = firstOffset; + } + endOffset = lastOffset; + return new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount((short) 1); + } finally { + lock.writeLock().unlock(); + } + } + + private void acquireSubsetBatchRecords( + String memberId, + long requestFirstOffset, + long requestLastOffset, + InFlightBatch inFlightBatch, + List<AcquiredRecords> result + ) { + lock.writeLock().lock(); + try { + for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < requestFirstOffset) { + continue; + } + + if (offsetState.getKey() > requestLastOffset) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.AVAILABLE) { + log.trace("The offset is not available skipping, offset: {} batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, + memberId); + if (updateResult == null) { + log.trace("Unable to acquire records for the offset: {} in batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the offset. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); + // Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + + // TODO: Maybe we can club the continuous offsets here. + result.add(new AcquiredRecords() + .setFirstOffset(offsetState.getKey()) + .setLastOffset(offsetState.getKey()) + .setDeliveryCount((short) offsetState.getValue().deliveryCount)); + } + } finally { + lock.writeLock().unlock(); + } + } + + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { + return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); + } + + // TODO: maxDeliveryCount should be utilized here once it is implemented /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Apply acquisition lock to acquired records. + * @param memberId The member id of the client that is putting the acquisition lock. + * @param firstOffset The first offset of the acquired records. + * @param lastOffset The last offset of the acquired records. */ Review Comment: nit: You're missing a parameter in the javadoc comment. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -238,8 +245,77 @@ public static RecordState forId(byte id) { * @return The next fetch offset that should be fetched from the leader. */ public long nextFetchOffset() { - // TODO: Implement the logic to compute the next fetch offset. - return 0; + /* + The logic for determining the next offset to fetch data from a Share Partition hinges on a + flag called findNextFetchOffset. If this flag is set to true, then the next fetch offset + should be re-computed, otherwise the next fetch offset is Share Partition End Offset + 1. + The flag is set to true in the following cases: + 1. When some previously acquired records are acknowledged with type RELEASE. + 2. When the record lock duration expires for some acquired records. + 3. When some records are released on share session close. + The re-computation of next fetch offset is done by iterating over the cachedState and finding + the first available record. If no available record is found, then the next fetch offset is + set to Share Partition End Offset + 1 and findNextFetchOffset flag is set to false. + */ + lock.writeLock().lock(); + try { + // When none of the records in the cachedState are in the AVAILABLE state, findNextFetchOffset will be false + if (!findNextFetchOffset.get()) { + if (cachedState.isEmpty() || startOffset > cachedState.lastEntry().getValue().lastOffset) { + // 1. When cachedState is empty, endOffset is set to the next offset of the last offset removed from + // batch, which is the next offset to be fetched. + // 2. When startOffset has moved beyond the in-flight records, startOffset and endOffset point to the LSO, + // which is the next offset to be fetched. + return endOffset; Review Comment: I'm a bit surprised by the need to have these as two separate cases. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -310,37 +485,264 @@ private void initialize() { // TODO: Provide implementation to initialize the share partition. } + private AcquiredRecords acquireNewBatchRecords( + String memberId, + long firstOffset, + long lastOffset + ) { + lock.writeLock().lock(); + try { + // Schedule acquisition lock timeout for the batch. + AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset); + // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. + cachedState.put(firstOffset, new InFlightBatch( + memberId, + firstOffset, + lastOffset, + RecordState.ACQUIRED, + 1, + timerTask)); + // if the cachedState was empty before acquiring the new batches then startOffset needs to be updated + if (cachedState.firstKey() == firstOffset) { + startOffset = firstOffset; + } + endOffset = lastOffset; + return new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount((short) 1); + } finally { + lock.writeLock().unlock(); + } + } + + private void acquireSubsetBatchRecords( + String memberId, + long requestFirstOffset, + long requestLastOffset, + InFlightBatch inFlightBatch, + List<AcquiredRecords> result + ) { + lock.writeLock().lock(); + try { + for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + // For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + if (offsetState.getKey() < requestFirstOffset) { + continue; + } + + if (offsetState.getKey() > requestLastOffset) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.AVAILABLE) { + log.trace("The offset is not available skipping, offset: {} batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, maxDeliveryCount, + memberId); + if (updateResult == null) { + log.trace("Unable to acquire records for the offset: {} in batch: {}" + + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + // Schedule acquisition lock timeout for the offset. + AcquisitionLockTimerTask acquisitionLockTimeoutTask = scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), offsetState.getKey()); + // Update acquisition lock timeout task for the offset. + offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask); + + // TODO: Maybe we can club the continuous offsets here. + result.add(new AcquiredRecords() + .setFirstOffset(offsetState.getKey()) + .setLastOffset(offsetState.getKey()) + .setDeliveryCount((short) offsetState.getValue().deliveryCount)); + } + } finally { + lock.writeLock().unlock(); + } + } + + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { + return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); + } + + // TODO: maxDeliveryCount should be utilized here once it is implemented /** - * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + * Apply acquisition lock to acquired records. + * @param memberId The member id of the client that is putting the acquisition lock. + * @param firstOffset The first offset of the acquired records. + * @param lastOffset The last offset of the acquired records. */ - private static class InFlightBatch { - /** - * The offset of the first record in the batch that is fetched from the log. - */ + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset, long delayMs) { + AcquisitionLockTimerTask acquistionLockTimerTask = acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs); + timer.add(acquistionLockTimerTask); + return acquistionLockTimerTask; + } + + private AcquisitionLockTimerTask acquisitionLockTimerTask(String memberId, long firstOffset, long lastOffset, long delayMs) { + return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset); + } + + private final class AcquisitionLockTimerTask extends TimerTask { + private final long expirationMs; + private final String memberId; private final long firstOffset; - /** - * The last offset of the batch that is fetched from the log. - */ private final long lastOffset; + + AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) { + super(delayMs); + this.expirationMs = time.hiResClockMs() + delayMs; + this.memberId = memberId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + + long expirationMs() { + return expirationMs; + } + /** - * The in-flight state of the fetched records. If the offset state map is empty then inflightState - * determines the state of the complete batch else individual offset determines the state of - * the respective records. + * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. */ + @Override + public void run() { + releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); + } + } + + private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) { + // TODO: Implement the logic to release the acquisition lock on timeout. + } + + private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetToCompare, long lastOffsetToCompare) { + return inFlightBatch.firstOffset >= firstOffsetToCompare && inFlightBatch.lastOffset <= lastOffsetToCompare; + } + + // Visible for testing. Should only be used for testing purposes. + NavigableMap<Long, InFlightBatch> cachedState() { + return new ConcurrentSkipListMap<>(cachedState); + } + + /** + * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + */ + final class InFlightBatch { + // The offset of the first record in the batch that is fetched from the log. + private final long firstOffset; + // The last offset of the batch that is fetched from the log. + private final long lastOffset; + + // The in-flight state of the fetched records. If the offset state map is empty then inflightState + // determines the state of the complete batch else individual offset determines the state of + // the respective records. private InFlightState inFlightState; - InFlightBatch(String memberId, long firstOffset, long lastOffset, RecordState state, int deliveryCount) { + // The subset of offsets within the batch that holds different record states within the parent + // fetched batch. This is used to maintain the state of the records per offset, it is complex + // to maintain the subset batch (InFlightBatch) within the parent batch itself as the nesting Review Comment: I'm not sure I understand the point about nesting in this comment. ########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -257,11 +333,110 @@ public CompletableFuture<List<AcquiredRecords>> acquire( FetchPartitionData fetchPartitionData ) { log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData); + RecordBatch lastBatch = fetchPartitionData.records.lastBatch().orElse(null); + if (lastBatch == null) { + // Nothing to acquire. + return CompletableFuture.completedFuture(Collections.emptyList()); + } - CompletableFuture<List<AcquiredRecords>> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + // We require the first batch of records to get the base offset. Stop parsing further + // batches. + RecordBatch firstBatch = fetchPartitionData.records.batches().iterator().next(); + lock.writeLock().lock(); + try { + long baseOffset = firstBatch.baseOffset(); + // Find the floor batch record for the request batch. The request batch could be + // for a subset of the in-flight batch i.e. cached batch of offset 10-14 and request batch + // of 12-13. Hence, floor entry is fetched to find the sub-map. + Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(baseOffset); + // We might find a batch with floor entry but not necessarily that batch has an overlap, + // if the request batch base offset is ahead of last offset from floor entry i.e. cached + // batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap. + if (floorOffset != null && floorOffset.getValue().lastOffset >= baseOffset) { + baseOffset = floorOffset.getKey(); + } + // Validate if the fetch records are already part of existing batches and if available. + NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true); + // No overlap with request offsets in the cache for in-flight records. Acquire the complete + // batch. + if (subMap.isEmpty()) { + log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", + groupId, topicIdPartition); + return CompletableFuture.completedFuture(Collections.singletonList( + acquireNewBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset()))); + } - return future; + log.trace("Overlap exists with in-flight records. Acquire the records if available for" + + " the share group: {}-{}", groupId, topicIdPartition); + List<AcquiredRecords> result = new ArrayList<>(); + // The fetched records are already part of the in-flight records. The records might + // be available for re-delivery hence try acquiring same. The request batches could + // be an exact match, subset or span over multiple already fetched batches. + for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); + // Compute if the batch is a full match. + boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset()); + + if (!fullMatch || inFlightBatch.offsetState != null) { + log.trace("Subset or offset tracked batch record found for share partition," + + " batch: {} request offsets - first: {}, last: {} for the share" + + " group: {}-{}", inFlightBatch, firstBatch.baseOffset(), + lastBatch.lastOffset(), groupId, topicIdPartition); + if (inFlightBatch.offsetState == null) { + // Though the request is a subset of in-flight batch but the offset + // tracking has not been initialized yet which means that we could only + // acquire subset of offsets from the in-flight batch but only if the + // complete batch is available yet. Hence, do a pre-check to avoid exploding + // the in-flight offset tracking unnecessarily. + if (inFlightBatch.batchState() != RecordState.AVAILABLE) { + log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}" + + " skipping offset tracking for batch as well.", groupId, + topicIdPartition, inFlightBatch); + continue; + } + // The request batch is a subset or per offset state is managed hence update + // the offsets state in the in-flight batch. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + } + acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result); + continue; + } + + // The in-flight batch is a full match hence change the state of the complete. Review Comment: nit: "of the complete" is not grammatical. I think "of the complete batch" perhaps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org