apoorvmittal10 commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1807767533
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1058,36 +1084,57 @@ private void
maybeCompleteInitialization(CompletableFuture<Void> future) {
private AcquiredRecords acquireNewBatchRecords(
String memberId,
+ Iterable<? extends RecordBatch> batches,
long firstOffset,
- long lastOffset
+ long lastOffset,
+ int maxFetchRecords
) {
lock.writeLock().lock();
try {
+ // If same batch is fetched and previous batch is removed from the
cache then we need to
+ // update the batch first offset to endOffset, only if enfOffset
is passed the firstOffset.
+ // For an initial start of the share fetch from a topic partition
the endOffset will be initialized
+ // to 0 but firstOffset can be higher than 0.
+ long firstAcquiredOffset = firstOffset;
+ if (cachedState.isEmpty() && endOffset > firstAcquiredOffset) {
+ firstAcquiredOffset = endOffset;
Review Comment:
> Since cachedState is not empty, we won't set firstAcquiredOffset to 105
and leave it as 100, which seems incorrect.
Yes, that's fine. As in those cases we need not to move firstAcquired offset
as we are not handling fresh batch acquisition i.e. this check in only
important when we are acquiring the batch for first time.
This method will not be triggered as submap is not empty, i.e. replica
manager batch [100, 100] will have an overlap with [105, 110] cached batch.
The complete batch from [105,110] will be acquired as per the lines from
633, where inflight batch is a full match and available to acquire.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]