[ https://issues.apache.org/jira/browse/KAFKA-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Bradstreet updated KAFKA-7410: ------------------------------------ Description: AdminUtils creates a bad partition assignment when the number of brokers on each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers rack C. Under such a scenario, a single broker from rack C may allocated more frequently than expected. kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list of brokers alternating by rack, however once it runs out of brokers on the racks with fewer brokers, it ends up generating a run of brokers from the same rack together as rackIterator.hasNext will return false for the other racks. {code:java} while (result.size < brokerRackMap.size) { val rackIterator = brokersIteratorByRack(racks(rackIndex)) if (rackIterator.hasNext) result += rackIterator.next() rackIndex = (rackIndex + 1) % racks.length }{code} Once assignReplicasToBrokersRackAware encounters the run of brokers from the same rack, and it attempts to maintain the rack invariant, it will skip all of the C brokers until it wraps around to the first broker in the alternated list and choose that broker if it is from a different rack. Note the code below that skips over the run when choosing the replicas. {code:java} if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 {code} It repeats this behavior for each of the remaining brokers for C, each time choosing the first broker in the alternated list until it has allocated all of the partitions. See the attached sample code for more details. was: AdminUtils creates a bad partition assignment when the number of brokers on each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers rack C. Under such a scenario, a single broker from rack C may be assigned over and over again, when more balanced allocations exist. kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list of brokers alternating by rack, however once it runs out of brokers on the racks with fewer brokers, it ends up placing a run of brokers from the same rack together as rackIterator.hasNext will return false for the other racks. {code:java} while (result.size < brokerRackMap.size) { val rackIterator = brokersIteratorByRack(racks(rackIndex)) if (rackIterator.hasNext) result += rackIterator.next() rackIndex = (rackIndex + 1) % racks.length }{code} Once assignReplicasToBrokersRackAware hits the run of brokers from the same rack, when choosing the replicas to go along with the leader on the rack with the most brokers e.g. C, it will skip all of the C brokers until it wraps around to the first broker in the alternated list, and choose the first broker in the alternated list. {code:java} if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 {code} It does so for each of the remaining brokers for C, choosing the first broker in the alternated list until it's allocated all of the partitions. See the attached sample code for more details. > Rack aware partition assignment creates highly unbalanced broker assignments > on unbalanced racks > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-7410 > URL: https://issues.apache.org/jira/browse/KAFKA-7410 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 1.1.1 > Reporter: Lucas Bradstreet > Priority: Major > Attachments: AdminUtilsTest.scala > > > AdminUtils creates a bad partition assignment when the number of brokers on > each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 > brokers rack C. Under such a scenario, a single broker from rack C may > allocated more frequently than expected. > kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a > list of brokers alternating by rack, however once it runs out of brokers on > the racks with fewer brokers, it ends up generating a run of brokers from the > same rack together as rackIterator.hasNext will return false for the other > racks. > {code:java} > while (result.size < brokerRackMap.size) { > val rackIterator = brokersIteratorByRack(racks(rackIndex)) > if (rackIterator.hasNext) > result += rackIterator.next() > rackIndex = (rackIndex + 1) % racks.length > }{code} > Once assignReplicasToBrokersRackAware encounters the run of brokers from the > same rack, and it attempts to maintain the rack invariant, it will skip all > of the C brokers until it wraps around to the first broker in the alternated > list and choose that broker if it is from a different rack. Note the code > below that skips over the run when choosing the replicas. > {code:java} > if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) > && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == > numBrokers)) { > replicaBuffer += broker > racksWithReplicas += rack > brokersWithReplicas += broker > done = true > } > k += 1 > {code} > It repeats this behavior for each of the remaining brokers for C, each time > choosing the first broker in the alternated list until it has allocated all > of the partitions. > See the attached sample code for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)