junrao commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1806900045
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1058,36 +1084,57 @@ private void
maybeCompleteInitialization(CompletableFuture<Void> future) {
private AcquiredRecords acquireNewBatchRecords(
String memberId,
+ Iterable<? extends RecordBatch> batches,
long firstOffset,
- long lastOffset
+ long lastOffset,
+ int maxFetchRecords
) {
lock.writeLock().lock();
try {
+ // If same batch is fetched and previous batch is removed from the
cache then we need to
+ // update the batch first offset to endOffset, only if enfOffset
is passed the firstOffset.
+ // For an initial start of the share fetch from a topic partition
the endOffset will be initialized
+ // to 0 but firstOffset can be higher than 0.
+ long firstAcquiredOffset = firstOffset;
+ if (cachedState.isEmpty() && endOffset > firstAcquiredOffset) {
+ firstAcquiredOffset = endOffset;
Review Comment:
Is that true? Suppose endOffset is initialized to 105 and the log has two
batches with offset range [100, 110] and [111, 120]. Initially cachedState is
empty and we call acquireNewBatchRecords with the first batch. We will set
firstAcquiredOffset to 105 in this case. [105, 110] will be in acquired state
and endOffset advances to 111. Another share fetch request comes in and we call
acquireNewBatchRecords with the second batch. [111, 120] will be in acquired
state and endOffset advances to 121. The first batch times out. [105, 110] will
be in available state and endOffset is reset to 105. Now another share fetch
request comes in. We will call acquireNewBatchRecords with the first batch.
Since cachedState is not empty, we won't set firstAcquiredOffset to 105 and
leave it as 100, which seems incorrect.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1059,36 +1085,57 @@ private void
maybeCompleteInitialization(CompletableFuture<Void> future) {
private AcquiredRecords acquireNewBatchRecords(
String memberId,
+ Iterable<? extends RecordBatch> batches,
long firstOffset,
- long lastOffset
+ long lastOffset,
+ int maxFetchRecords
) {
lock.writeLock().lock();
try {
+ // If same batch is fetched and previous batch is removed from the
cache then we need to
+ // update the batch first offset to endOffset, only if endOffset
is passed the firstOffset.
Review Comment:
endOffset is passed => endOffset passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]