junrao commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1803740450


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1058,36 +1084,57 @@ private void 
maybeCompleteInitialization(CompletableFuture<Void> future) {
 
     private AcquiredRecords acquireNewBatchRecords(
         String memberId,
+        Iterable<? extends RecordBatch> batches,
         long firstOffset,
-        long lastOffset
+        long lastOffset,
+        int maxFetchRecords
     ) {
         lock.writeLock().lock();
         try {
+            // If same batch is fetched and previous batch is removed from the 
cache then we need to
+            // update the batch first offset to endOffset, only if enfOffset 
is passed the firstOffset.

Review Comment:
   typo enfOffset
   enfOffset is passed => enfOffset passed



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -219,6 +227,7 @@ private SharePartitionManager(
         this.delayedActionsQueue = delayedActionsQueue;
         this.groupConfigManager = groupConfigManager;
         this.shareGroupMetrics = new 
ShareGroupMetrics(Objects.requireNonNull(metrics), time);
+        this.maxFetchRecords = shareFetchMaxFetchRecords;

Review Comment:
   Instead of duplicating the code between two constructors. Could we just 
forward the constructor to the other constructor?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1058,36 +1084,57 @@ private void 
maybeCompleteInitialization(CompletableFuture<Void> future) {
 
     private AcquiredRecords acquireNewBatchRecords(
         String memberId,
+        Iterable<? extends RecordBatch> batches,
         long firstOffset,
-        long lastOffset
+        long lastOffset,
+        int maxFetchRecords
     ) {
         lock.writeLock().lock();
         try {
+            // If same batch is fetched and previous batch is removed from the 
cache then we need to
+            // update the batch first offset to endOffset, only if enfOffset 
is passed the firstOffset.
+            // For an initial start of the share fetch from a topic partition 
the endOffset will be initialized
+            // to 0 but firstOffset can be higher than 0.
+            long firstAcquiredOffset = firstOffset;
+            if (cachedState.isEmpty() && endOffset > firstAcquiredOffset) {
+                firstAcquiredOffset = endOffset;

Review Comment:
   It's possible for `endOffset > firstAcquiredOffset` even when 
`cachedState.isEmpty()` is false, right? In that case, we still want to bound 
firstAcquiredOffset by endOffset.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1058,36 +1084,57 @@ private void 
maybeCompleteInitialization(CompletableFuture<Void> future) {
 
     private AcquiredRecords acquireNewBatchRecords(
         String memberId,
+        Iterable<? extends RecordBatch> batches,
         long firstOffset,
-        long lastOffset
+        long lastOffset,
+        int maxFetchRecords
     ) {
         lock.writeLock().lock();
         try {
+            // If same batch is fetched and previous batch is removed from the 
cache then we need to
+            // update the batch first offset to endOffset, only if enfOffset 
is passed the firstOffset.
+            // For an initial start of the share fetch from a topic partition 
the endOffset will be initialized
+            // to 0 but firstOffset can be higher than 0.
+            long firstAcquiredOffset = firstOffset;
+            if (cachedState.isEmpty() && endOffset > firstAcquiredOffset) {
+                firstAcquiredOffset = endOffset;
+            }
+
+            // Check how many messages can be acquired from the batch.
+            long lastAcquiredOffset = lastOffset;
+            if (maxFetchRecords < lastAcquiredOffset - firstAcquiredOffset + 
1) {
+                // The max messages to acquire is less than the complete 
available batches hence
+                // limit the acquired records. The last offset shall be the 
batches last offset
+                // which falls under the max messages limit. As the max fetch 
records is the soft
+                // limit, the last offset can be higher than the max messages.
+                lastAcquiredOffset = 
findLastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + 
maxFetchRecords - 1);

Review Comment:
   `firstAcquiredOffset + maxFetchRecords - 1` won't be accurate for compacted 
topics. Not sure if there is a simple solution though.



##########
share/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The ShareAcquiredRecords class is used to send the acquired records and 
associated metadata.
+ */
+public class ShareAcquiredRecords {
+
+    public static final ShareAcquiredRecords EMPTY_SHARE_ACQUIRED_RECORDS = 
new ShareAcquiredRecords();
+
+    /**
+     * The list of acquired records.
+     */
+    private final List<AcquiredRecords> records;
+    /**
+      * The number of offsets acquired. The acquired records has a first and 
last offset, and the count
+      * is the actual number of offsets acquired.
+     */
+    private final int count;
+
+    public ShareAcquiredRecords(
+        List<AcquiredRecords> records,
+        int count
+    ) {
+        this.records = records;
+        this.count = count;
+    }
+
+    private ShareAcquiredRecords() {
+        this.records = Collections.emptyList();
+        this.count = 0;
+    }
+
+    public List<AcquiredRecords> records() {

Review Comment:
   records => acquiredRecords ?



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -84,24 +88,31 @@ static Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData> processFetchR
                     partitionData.setErrorMessage(Errors.NONE.message());
                 }
             } else {
-                List<AcquiredRecords> acquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+                ShareAcquiredRecords shareAcquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), 
shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
                 log.trace("Acquired records for topicIdPartition: {} with 
share fetch data: {}, records: {}",
-                    topicIdPartition, shareFetchData, acquiredRecords);
+                    topicIdPartition, shareFetchData, shareAcquiredRecords);
                 // Maybe, in the future, check if no records are acquired, and 
we want to retry
                 // replica manager fetch. Depends on the share partition 
manager implementation,
                 // if we want parallel requests for the same share partition 
or not.
-                if (acquiredRecords.isEmpty()) {
+                if (shareAcquiredRecords.records().isEmpty()) {
                     partitionData
                         .setRecords(null)
                         .setAcquiredRecords(Collections.emptyList());
                 } else {
                     partitionData
+                        // We set the records to the fetchPartitionData 
records. We do not alter the records
+                        // fetched from the replica manager as they follow 
zero copy buffer. The acquired records

Review Comment:
   If `records` contains 10 batches and we only acquire records in the first 
batch, it's wasteful to return all 10 batches in the records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to