junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1809669520
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +197,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ boolean minBytesSatisfied = false;
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
Review Comment:
1. This approach is ok, but probably not the most efficient.
`replicaManager.readFromLog` is relatively expensive. To avoid calling it on
every HWM change, the DelayedFetch maintains the file position of the fetch
offset and compares it with the file position of HWM to estimate the fetchable
bytes. We could potentially do the same thing here. This requires us to
maintain the file position for SharePartition.endOffset.
2. Ideally, we need to take into account the size of those non-acquirable
batches in SharePartition when estimating the fetchable bytes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]