ableegoldman commented on code in PR #13965:
URL: https://github.com/apache/kafka/pull/13965#discussion_r1599471029


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -739,8 +750,13 @@ private void 
assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions
                     int firstIndex = 
rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithExactlyMinQuotaPartitions, 0);
                     if (firstIndex >= 0) {
                         consumer = 
unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex);
-                        if (assignment.get(consumer).size() + 1 == maxQuota)
+                        if (assignment.get(consumer).size() + 1 == maxQuota) {
                             
unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex);
+                            currentNumMembersWithOverMinQuotaPartitions++;
+                            if (currentNumMembersWithOverMinQuotaPartitions == 
expectedNumMembersWithOverMinQuotaPartitions) {

Review Comment:
   Let's add a comment above this line, too. Something like
   
   ```
   // Clear this once the current num consumers over minQuota reaches the 
expected number since this
   // means all consumers at minQuota are now considered filled
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -729,8 +739,9 @@ private void assignRackAwareRoundRobin(List<TopicPartition> 
unassignedPartitions
                     int assignmentCount = assignment.get(consumer).size() + 1;
                     if (assignmentCount >= minQuota) {
                         
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
-                        if (assignmentCount < maxQuota)
+                        if (assignmentCount < maxQuota && 
(currentNumMembersWithOverMinQuotaPartitions < 
expectedNumMembersWithOverMinQuotaPartitions)) {

Review Comment:
   This fix makes sense, good find. But let's add a comment because obviously 
this is difficult to understand if the bug slipped in.
   
   Something like:
   ```
   // Only add this consumer if the current num members at maxQuota is less 
than the expected number
   // since a consumer at minQuota can only be considered unfilled if it's 
possible to add another partition,
   // which would bump it to maxQuota and exceed the 
expectedNumMembersWithOverMinQuotaPartitions
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -579,7 +579,7 @@ protected List<TopicPartition> 
getAllTopicPartitions(List<String> sortedAllTopic
     private class ConstrainedAssignmentBuilder extends 
AbstractAssignmentBuilder {
 
         private final Set<TopicPartition> partitionsWithMultiplePreviousOwners;
-        private final Set<TopicPartition> allRevokedPartitions;
+        private final Map<TopicPartition, String> mayRevokedPartitions;

Review Comment:
   nit: this name is a bit confusing, how about `maybeRevokedPartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to