adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1806619158
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,6 +187,58 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
+ /**
+ * Prepare partitions fetch data structure for acquirable partitions in
the share fetch request satisfying minBytes criteria.
+ */
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
+ boolean
hasRequestTimedOut) {
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}", topicPartitionData,
+ shareFetchData.groupId(), shareFetchData.fetchParams());
+ Map<TopicIdPartition, FetchPartitionData>
replicaManagerFetchSatisfyingMinBytes = new HashMap<>();
+ Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
+ try {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ shareFetchData.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UnboundedQuota$.MODULE$,
+ true);
+
+ AtomicInteger accumulatedBytes = new AtomicInteger(0);
+
+ responseLogResult.foreach(tpLogResult -> {
+ TopicIdPartition topicIdPartition = tpLogResult._1();
+ LogReadResult logResult = tpLogResult._2();
+ FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
+ responseData.put(topicIdPartition, fetchPartitionData);
+
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
Review Comment:
Hi @apoorvmittal10 , IIUC, minBytes is utilized in
`replicaManager.fetchMessages` functionality not in
`replicaManager.readFromLog`
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1729).
The way it calculates the `accumulatedBytes` is the same way I have done it in
my code ([original code
reference](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1718)).
I don't see the usage of `params.minBytes` in `readFromLog` functionality
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]