adixitconfluent commented on code in PR #20746:
URL: https://github.com/apache/kafka/pull/20746#discussion_r2466435461
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java:
##########
@@ -57,8 +62,43 @@ static PartitionMaxBytesStrategy type(StrategyType type) {
private static LinkedHashMap<TopicIdPartition, Integer>
uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions,
int acquiredPartitionsSize) {
checkValidArguments(requestMaxBytes, partitions,
acquiredPartitionsSize);
+ if (requestMaxBytes >= acquiredPartitionsSize) {
+ // Case 1: requestMaxBytes can be evenly distributed within
partitions. If there is extra bytes left post
+ // dividing it uniformly, assign it randomly to any one of the
partitions.
+ return allotUniformBytesToPartitions(partitions, requestMaxBytes,
acquiredPartitionsSize);
+ } else if (requestMaxBytes >= partitions.size()) {
+ // Case 2: requestMaxBytes can be distributed greedily in this
scenario to prevent any starvation. If
+ // there is extra bytes left post dividing it uniformly, assign it
randomly to any one of the partitions.
+ return allotUniformBytesToPartitions(partitions, requestMaxBytes,
partitions.size());
+ } else {
+ // Case 3: we will distribute requestMaxBytes to as many
partitions possible randomly to avoid starvation.
+ LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
0));
+ List<TopicIdPartition> partitionsList = new
ArrayList<>(partitions);
+ int index = RANDOM.nextInt(partitionsList.size());
+ int count = 0;
+ while (count < requestMaxBytes) {
+ partitionMaxBytes.put(partitionsList.get(index), 1);
+ count += 1;
+ index = (index + 1) % partitionsList.size();
+ }
+ return partitionMaxBytes;
+ }
+ }
+
+ private static LinkedHashMap<TopicIdPartition, Integer>
allotUniformBytesToPartitions(
+ Set<TopicIdPartition> partitions,
+ int requestMaxBytes,
+ int partitionsSize) {
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new
LinkedHashMap<>();
- partitions.forEach(partition -> partitionMaxBytes.put(partition,
requestMaxBytes / acquiredPartitionsSize));
+ int uniformPartitionBytes = requestMaxBytes / partitionsSize;
+ int remainingBytes = requestMaxBytes % partitionsSize;
+ partitions.forEach(partition -> partitionMaxBytes.put(partition,
uniformPartitionBytes));
+ if (remainingBytes != 0) {
+ List<TopicIdPartition> partitionsList = new
ArrayList<>(partitionMaxBytes.keySet());
Review Comment:
implemented 2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]