adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1810810655
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +197,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ boolean minBytesSatisfied = false;
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
Review Comment:
Hi @junrao , agreed this is a much better approach. Just one clarification -
> This requires us to maintain the file position for SharePartition.endOffset
You mean the file position of the latest offset that was fetched for the
share partition, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]