adixitconfluent commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2466435461


##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -57,8 +62,43 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
 
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions. If there is extra bytes left post
+            // dividing it uniformly, assign it randomly to any one of the 
partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
acquiredPartitionsSize);
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: requestMaxBytes can be distributed greedily in this 
scenario to prevent any starvation. If
+            // there is extra bytes left post dividing it uniformly, assign it 
randomly to any one of the partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
partitions.size());
+        } else {
+            // Case 3: we will distribute requestMaxBytes to as many 
partitions possible randomly to avoid starvation.
+            LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
0));
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitions);
+            int index = RANDOM.nextInt(partitionsList.size());
+            int count = 0;
+            while (count < requestMaxBytes) {
+                partitionMaxBytes.put(partitionsList.get(index), 1);
+                count += 1;
+                index = (index + 1) % partitionsList.size();
+            }
+            return partitionMaxBytes;
+        }
+    }
+
+    private static LinkedHashMap<TopicIdPartition, Integer> 
allotUniformBytesToPartitions(
+        Set<TopicIdPartition> partitions,
+        int requestMaxBytes,
+        int partitionsSize) {
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        int uniformPartitionBytes = requestMaxBytes / partitionsSize;
+        int remainingBytes = requestMaxBytes % partitionsSize;
+        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
uniformPartitionBytes));
+        if (remainingBytes != 0) {
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitionMaxBytes.keySet());

Review Comment:
   implemented 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to