apoorvmittal10 commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2465887866


##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -57,8 +62,43 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
 
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions. If there is extra bytes left post
+            // dividing it uniformly, assign it randomly to any one of the 
partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
acquiredPartitionsSize);
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: requestMaxBytes can be distributed greedily in this 
scenario to prevent any starvation. If
+            // there is extra bytes left post dividing it uniformly, assign it 
randomly to any one of the partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
partitions.size());
+        } else {
+            // Case 3: we will distribute requestMaxBytes to as many 
partitions possible randomly to avoid starvation.
+            LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
0));
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitions);
+            int index = RANDOM.nextInt(partitionsList.size());
+            int count = 0;
+            while (count < requestMaxBytes) {
+                partitionMaxBytes.put(partitionsList.get(index), 1);
+                count += 1;
+                index = (index + 1) % partitionsList.size();
+            }
+            return partitionMaxBytes;
+        }
+    }
+
+    private static LinkedHashMap<TopicIdPartition, Integer> 
allotUniformBytesToPartitions(
+        Set<TopicIdPartition> partitions,
+        int requestMaxBytes,
+        int partitionsSize) {
         LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
-        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
requestMaxBytes / acquiredPartitionsSize));
+        int uniformPartitionBytes = requestMaxBytes / partitionsSize;
+        int remainingBytes = requestMaxBytes % partitionsSize;
+        partitions.forEach(partition -> partitionMaxBytes.put(partition, 
uniformPartitionBytes));
+        if (remainingBytes != 0) {
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitionMaxBytes.keySet());

Review Comment:
   This seems unneccesary and on critical path of every request. We should 
avoid it.
   
   There could be multiple of better ways:
   1. Just put the remaining bytes to the first partition (Doesn't seems like a 
problem to me).
   2. Choose a random number and while iterating `partitions.forEach` you 
encounter the partition chose by `RANDOM.nextInt` just put those bytes.



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -57,8 +62,43 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
 
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions. If there is extra bytes left post
+            // dividing it uniformly, assign it randomly to any one of the 
partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
acquiredPartitionsSize);
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: requestMaxBytes can be distributed greedily in this 
scenario to prevent any starvation. If
+            // there is extra bytes left post dividing it uniformly, assign it 
randomly to any one of the partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
partitions.size());
+        } else {
+            // Case 3: we will distribute requestMaxBytes to as many 
partitions possible randomly to avoid starvation.
+            LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
0));
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitions);
+            int index = RANDOM.nextInt(partitionsList.size());
+            int count = 0;
+            while (count < requestMaxBytes) {
+                partitionMaxBytes.put(partitionsList.get(index), 1);
+                count += 1;
+                index = (index + 1) % partitionsList.size();
+            }
+            return partitionMaxBytes;
+        }
+    }
+
+    private static LinkedHashMap<TopicIdPartition, Integer> 
allotUniformBytesToPartitions(
+        Set<TopicIdPartition> partitions,
+        int requestMaxBytes,
+        int partitionsSize) {

Review Comment:
   nit
   ```suggestion
           int partitionsSize
           ) {
   ```



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -57,8 +62,43 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
 
     private static LinkedHashMap<TopicIdPartition, Integer> 
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, 
int acquiredPartitionsSize) {
         checkValidArguments(requestMaxBytes, partitions, 
acquiredPartitionsSize);
+        if (requestMaxBytes >= acquiredPartitionsSize) {
+            // Case 1: requestMaxBytes can be evenly distributed within 
partitions. If there is extra bytes left post
+            // dividing it uniformly, assign it randomly to any one of the 
partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
acquiredPartitionsSize);
+        } else if (requestMaxBytes >= partitions.size()) {
+            // Case 2: requestMaxBytes can be distributed greedily in this 
scenario to prevent any starvation. If
+            // there is extra bytes left post dividing it uniformly, assign it 
randomly to any one of the partitions.
+            return allotUniformBytesToPartitions(partitions, requestMaxBytes, 
partitions.size());
+        } else {
+            // Case 3: we will distribute requestMaxBytes to as many 
partitions possible randomly to avoid starvation.
+            LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new 
LinkedHashMap<>();
+            partitions.forEach(partition -> partitionMaxBytes.put(partition, 
0));
+            List<TopicIdPartition> partitionsList = new 
ArrayList<>(partitions);

Review Comment:
   Again not needed, can be simplified. You don't need new ArrayList alloaction 
to choose a partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to