junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1811067709


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +197,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
+    /**
+     * Prepare partitions fetch data structure for acquirable partitions in 
the share fetch request satisfying minBytes criteria.
+     */
+    Map<TopicIdPartition, FetchPartitionData> 
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
+                                                                      boolean 
hasRequestTimedOut) {
+        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}", topicPartitionData,
+            shareFetchData.groupId(), shareFetchData.fetchParams());
+        boolean minBytesSatisfied = false;
+        Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
+        try {
+            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(

Review Comment:
   > Though it's an ideal solution but the next fetch offset being prior to 
endOffset should be rare i.e. when some records are released or timedout. So I 
think we can avoid calculating non-acquirable and proceed to fetch anyways if 
our criteria from end offset to HWM meets. We can have the min bytes check 
after the fetch as currently in the PR. Wdyt?
   
   Yes, this sounds reasonable. We can just ignore the non-acquirable records 
for now.
   
   > You mean the file position of the latest offset that was fetched for the 
share partition, right?
   
   Basically, we need to maintain endOffset as a `LogOffsetMetadata`, which 
contains segment position. We can then use `LogOffsetMetadata.positionDiff` to 
calculate the available bytes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to