ableegoldman commented on code in PR #13965: URL: https://github.com/apache/kafka/pull/13965#discussion_r1599471029
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -739,8 +750,13 @@ private void assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions int firstIndex = rackInfo.nextRackConsumer(unassignedPartition, unfilledMembersWithExactlyMinQuotaPartitions, 0); if (firstIndex >= 0) { consumer = unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex); - if (assignment.get(consumer).size() + 1 == maxQuota) + if (assignment.get(consumer).size() + 1 == maxQuota) { unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex); + currentNumMembersWithOverMinQuotaPartitions++; + if (currentNumMembersWithOverMinQuotaPartitions == expectedNumMembersWithOverMinQuotaPartitions) { Review Comment: Let's add a comment above this line, too. Something like ``` // Clear this once the current num consumers over minQuota reaches the expected number since this // means all consumers at minQuota are now considered filled ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -729,8 +739,9 @@ private void assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions int assignmentCount = assignment.get(consumer).size() + 1; if (assignmentCount >= minQuota) { unfilledMembersWithUnderMinQuotaPartitions.remove(consumer); - if (assignmentCount < maxQuota) + if (assignmentCount < maxQuota && (currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions)) { Review Comment: This fix makes sense, good find. But let's add a comment because obviously this is difficult to understand if the bug slipped in. Something like: ``` // Only add this consumer if the current num members at maxQuota is less than the expected number // since a consumer at minQuota can only be considered unfilled if it's possible to add another partition, // which would bump it to maxQuota and exceed the expectedNumMembersWithOverMinQuotaPartitions ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -579,7 +579,7 @@ protected List<TopicPartition> getAllTopicPartitions(List<String> sortedAllTopic private class ConstrainedAssignmentBuilder extends AbstractAssignmentBuilder { private final Set<TopicPartition> partitionsWithMultiplePreviousOwners; - private final Set<TopicPartition> allRevokedPartitions; + private final Map<TopicPartition, String> mayRevokedPartitions; Review Comment: nit: this name is a bit confusing, how about `maybeRevokedPartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org