junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1811067709
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +197,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ boolean minBytesSatisfied = false;
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
Review Comment:
> Though it's an ideal solution but the next fetch offset being prior to
endOffset should be rare i.e. when some records are released or timedout. So I
think we can avoid calculating non-acquirable and proceed to fetch anyways if
our criteria from end offset to HWM meets. We can have the min bytes check
after the fetch as currently in the PR. Wdyt?
Yes, this sounds reasonable. We can just ignore the non-acquirable records
for now.
> You mean the file position of the latest offset that was fetched for the
share partition, right?
Basically, we need to maintain endOffset as a `LogOffsetMetadata`, which
contains segment position. We can then use `LogOffsetMetadata.positionDiff` to
calculate the available bytes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]