apoorvmittal10 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1933731091
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -633,11 +661,13 @@ public ShareAcquiredRecords acquire( firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords); } - log.trace("Overlap exists with in-flight records. Acquire the records if available for" - + " the share partition: {}-{}", groupId, topicIdPartition); - List<AcquiredRecords> result = new ArrayList<>(); // The acquired count is used to track the number of records acquired for the request. int acquiredCount = 0; + List<AcquiredRecords> result = new ArrayList<>(); + + log.trace("Overlap exists with in-flight records. Acquire the records if available for" + + " the share partition: {}-{}", groupId, topicIdPartition); + Review Comment: The change is not required as it's same as earlier. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -648,6 +678,29 @@ public ShareAcquiredRecords acquire( } InFlightBatch inFlightBatch = entry.getValue(); + + // If baseOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. + // Thus, a new batch needs to be acquired for the gap. + if (baseOffset < entry.getKey()) { + // This is to check whether the fetched records are all part of the gap, or they overlap with the next + // inFlight batch in the cachedState + if (lastBatch.lastOffset() < (entry.getKey())) { + // The entire request batch is part of the gap + return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + baseOffset, lastBatch.lastOffset(), batchSize, maxFetchRecords); Review Comment: And why we just return in that scenario, this seems not right? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1758,6 +1837,11 @@ private void maybeUpdateCachedStateAndOffsets() { return; } + // if there is an acquirable gap initially, then we should not move the startOffset + if (initialReadGapOffset != null && initialReadGapOffset.gapStartOffset() == startOffset) { + return; + } Review Comment: So this is beacuse a gap is present and post that messages in the cache are acknowledged, correct? But does this solve all cases? Also isn't `canMoveStartOffset` will return false in this scenario anyways? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -648,6 +678,29 @@ public ShareAcquiredRecords acquire( } InFlightBatch inFlightBatch = entry.getValue(); + + // If baseOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. + // Thus, a new batch needs to be acquired for the gap. + if (baseOffset < entry.getKey()) { + // This is to check whether the fetched records are all part of the gap, or they overlap with the next + // inFlight batch in the cachedState + if (lastBatch.lastOffset() < (entry.getKey())) { Review Comment: Can this scenario ever occur? Can you help me with an example please? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -648,6 +678,29 @@ public ShareAcquiredRecords acquire( } InFlightBatch inFlightBatch = entry.getValue(); + + // If baseOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. + // Thus, a new batch needs to be acquired for the gap. + if (baseOffset < entry.getKey()) { + // This is to check whether the fetched records are all part of the gap, or they overlap with the next + // inFlight batch in the cachedState + if (lastBatch.lastOffset() < (entry.getKey())) { + // The entire request batch is part of the gap + return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + baseOffset, lastBatch.lastOffset(), batchSize, maxFetchRecords); + } else { + result.add(new AcquiredRecords() + .setFirstOffset(baseOffset) + .setLastOffset(entry.getKey() - 1) + .setDeliveryCount((short) 1)); + acquiredCount += (int) (entry.getKey() - baseOffset); + acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + baseOffset, entry.getKey() - 1, batchSize, maxFetchRecords); + baseOffset = inFlightBatch.lastOffset() + 1; + continue; + } + } + baseOffset = inFlightBatch.lastOffset() + 1; Review Comment: Do not modify the baseOffset, you should have another variable as `maybeGapOffset` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1852,14 +1947,22 @@ private long findLastOffsetAcknowledged() { long lastOffsetAcknowledged = -1; try { for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) { + if (initialReadGapOffset != null && entry.getKey() >= initialReadGapOffset.gapStartOffset()) { + break; + } InFlightBatch inFlightBatch = entry.getValue(); if (inFlightBatch.offsetState() == null) { if (!isRecordStateAcknowledged(inFlightBatch.batchState())) { return lastOffsetAcknowledged; } - lastOffsetAcknowledged = inFlightBatch.lastOffset(); + if (initialReadGapOffset == null || inFlightBatch.lastOffset() < initialReadGapOffset.gapStartOffset()) { + lastOffsetAcknowledged = inFlightBatch.lastOffset(); + } } else { for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + if (initialReadGapOffset != null && offsetState.getKey() >= initialReadGapOffset.gapStartOffset()) { + break; + } Review Comment: Can we write comments here please. Do we really require 3 checks? The only thing you need is to identify when to stop either when saw some gap or unacked data. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -648,6 +678,29 @@ public ShareAcquiredRecords acquire( } InFlightBatch inFlightBatch = entry.getValue(); + + // If baseOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. + // Thus, a new batch needs to be acquired for the gap. + if (baseOffset < entry.getKey()) { + // This is to check whether the fetched records are all part of the gap, or they overlap with the next + // inFlight batch in the cachedState + if (lastBatch.lastOffset() < (entry.getKey())) { Review Comment: ```suggestion if (lastBatch.lastOffset() < entry.getKey()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org