adixitconfluent commented on code in PR #17870:
URL: https://github.com/apache/kafka/pull/17870#discussion_r1912173308


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -390,18 +407,27 @@ private void handleFetchException(
     }
 
     // Visible for testing.
-    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData,
+    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
                                                                 
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
missingLogReadTopicPartitions = new LinkedHashMap<>();
-        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+        LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = 
new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
             if (!existingFetchedData.containsKey(topicIdPartition)) {
-                missingLogReadTopicPartitions.put(topicIdPartition, 
partitionData);
+                missingLogReadTopicPartitions.put(topicIdPartition, 
fetchOffset);
             }
         });
         if (missingLogReadTopicPartitions.isEmpty()) {
             return existingFetchedData;
         }
-        LinkedHashMap<TopicIdPartition, LogReadResult> 
missingTopicPartitionsLogReadResponse = 
readFromLog(missingLogReadTopicPartitions);
+
+        // Computing the total bytes that has already been fetched for the 
existing fetched data.
+        int totalPartitionMaxBytesUsed = 0;
+        for (LogReadResult logReadResult : existingFetchedData.values()) {
+            totalPartitionMaxBytesUsed += 
logReadResult.info().records.sizeInBytes();
+        }
+

Review Comment:
   @apoorvmittal10, Yes, we can do that as well. But, I believe that this 
dynamic approach of computing `partitionMaxBytes` for leftover partitions if 
you know the bytes fetched for some partitions, is better blindly using the 
approach that we just divide it equally. Because, even if we set 
`partitionMaxBytes` of some partitions as 2MB, they may not have enough data 
produced in them so we might only fetch for example - 0.5 MB for those 
partitions. Now, using this dynamic approach, we give ourselves the opportunity 
to possibly fetch more data from the leftover partitions (which would be 
benefitial in the scenario if they have a heavier produce than the former 
partitions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to