chia7712 commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1807825050


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -513,18 +514,27 @@ public long nextFetchOffset() {
      * fetched from the leader.
      *
      * @param memberId           The member id of the client that is fetching 
the record.
+     * @param maxFetchRecords    The maximum number of records that should be 
acquired, this is a soft
+     *                           limit and the method might acquire more 
records than the maxFetchRecords,
+     *                           if the records are already part of the same 
fetch batch.
      * @param fetchPartitionData The fetched records for the share partition.
      * @return The acquired records for the share partition.
      */
-    public List<AcquiredRecords> acquire(
+    public ShareAcquiredRecords acquire(
         String memberId,
+        int maxFetchRecords,
         FetchPartitionData fetchPartitionData
     ) {
         log.trace("Received acquire request for share partition: {}-{} 
memberId: {}", groupId, topicIdPartition, memberId);
+        if (maxFetchRecords <= 0) {
+            // Nothing to acquire.

Review Comment:
   I guess this is temporary, right? we should not allow `maxFetchRecords<=0` 
in the future.



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -84,24 +88,31 @@ static Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData> processFetchR
                     partitionData.setErrorMessage(Errors.NONE.message());
                 }
             } else {
-                List<AcquiredRecords> acquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+                ShareAcquiredRecords shareAcquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), 
shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
                 log.trace("Acquired records for topicIdPartition: {} with 
share fetch data: {}, records: {}",
-                    topicIdPartition, shareFetchData, acquiredRecords);
+                    topicIdPartition, shareFetchData, shareAcquiredRecords);
                 // Maybe, in the future, check if no records are acquired, and 
we want to retry
                 // replica manager fetch. Depends on the share partition 
manager implementation,
                 // if we want parallel requests for the same share partition 
or not.
-                if (acquiredRecords.isEmpty()) {
+                if (shareAcquiredRecords.records().isEmpty()) {
                     partitionData
                         .setRecords(null)
                         .setAcquiredRecords(Collections.emptyList());
                 } else {
                     partitionData
+                        // We set the records to the fetchPartitionData 
records. We do not alter the records
+                        // fetched from the replica manager as they follow 
zero copy buffer. The acquired records

Review Comment:
   Out of curiosity, how should users configure max.poll.records to minimize 
network I/O waste? Should users set `max.poll.records=Infinite`? If so, should 
we consider making an infinite value the default for `max.poll.records`?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1658,6 +1706,26 @@ private long findLastOffsetAcknowledged() {
         return lastOffsetAcknowledged;
     }
 
+    /**
+     * Find the last offset from the batch which contains the request offset. 
If found, return the last offset
+     * of the batch, otherwise return the request offset.
+     *
+     * @param batches The batches to search for the request offset.
+     * @param offset The request offset to find.
+     * @return The last offset of the batch which contains the request offset, 
otherwise the request offset.
+     */
+    private long findLastOffsetFromBatchWithRequestOffset(
+        Iterable<? extends RecordBatch> batches,
+        long offset
+    ) {
+        for (RecordBatch batch : batches) {
+            if (offset >= batch.baseOffset() && offset <= batch.lastOffset()) {

Review Comment:
   `lastOffset` loads the entire batch header, so perhaps we should avoid 
checking `lastOffset` for every batch." For example:
   ```java
           RecordBatch previous = null;
           for (RecordBatch batch : batches) {
               if (offset >= batch.baseOffset()) {
                   previous = batch;
                   continue;
               }
               break;
           }
           if (previous != null && offset <= previous.lastOffset()) return 
previous.lastOffset();
           return offset;
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -612,16 +634,20 @@ public List<AcquiredRecords> acquire(
                     .setFirstOffset(inFlightBatch.firstOffset())
                     .setLastOffset(inFlightBatch.lastOffset())
                     .setDeliveryCount((short) 
inFlightBatch.batchDeliveryCount()));
+                acquiredCount += (int) (inFlightBatch.lastOffset() - 
inFlightBatch.firstOffset() + 1);

Review Comment:
   I suppose this `acquiredCount may also be inaccurate if the topic is 
compacted, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to