apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1810373760
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +197,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ boolean minBytesSatisfied = false;
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
Review Comment:
Thanks a lot for suggestion @junrao. This is good.
I have aquestion on 2.
> Ideally, we need to take into account the size of those non-acquirable
batches in SharePartition when estimating the fetchable bytes.
Though it's an ideal solution but the next fetch offset being prior to
endOffset should be rare i.e. when some records are released or timedout. So I
think we can avoid calculating non-acquirable and proceed to fetch anyways if
our criteria from end offset to HWM meets. We can have the min bytes check
after the fetch as currently in the PR. Wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]