guozhangwang commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668293781



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -186,16 +209,25 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
             
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
         List<TopicPartition> assignedPartitions = new ArrayList<>();
-        // Reassign previously owned partitions to the expected number
+        // Reassign previously owned partitions, up to the expected number of 
partitions per consumer
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
 
+            for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+                if (ownedPartitions.contains(doublyClaimedPartition)) {
+                    log.warn("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple "

Review comment:
       Same here: in our current code this should never happen, so what about 
log as ERROR?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
                 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
                 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-                // the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+                // the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
                 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-                    unfilledMembers.add(consumer);
+                    potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
       Honestly it took me quite a while to understand the fix :P After 
understanding that I think maybe it's better to rename these two collections 
more explicitly:
   
   1) `unfilledMembers` -> `MembersWithLessThanMinQuotaPartitions`.
   2) `potentiallyUnfilledMembersAtMinQuota` -> 
`MembersWithExactMinQuotaPartitions`.
   
   And also (since the maxQuota is always either == minQuota or minQuota + 1):
   
   3) `expectedNumMembersAssignedOverMinQuota` -> 
`expectedNumMembersWithMaxQuota` 
   4) `numMembersAssignedOverMinQuota` -> `numMembersWithMaxQuota`

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##########
@@ -555,7 +558,7 @@ public void 
testLargeAssignmentAndGroupWithUniformSubscription() {
         assignor.assign(partitionsPerTopic, subscriptions);
     }
 
-    @Timeout(40)
+    @Timeout(60)

Review comment:
       +1

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't 
part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-                        ownedPartitions.add(tp);
+
+                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            ownedPartitions.add(tp);
+                        } else {
+                            String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+                            log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "

Review comment:
       nit: do you think we should log at ERROR since this is not expected 
really? Right now we would sort of "hide" such bugs and still be able to 
proceed silently; I feel we should shouting out such scenarios a bit louder in 
logs.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
                 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
                 numMembersAssignedOverMinQuota++;
+                if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+                    potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
       I'd suggest we remove this 
`potentiallyUnfilledMembersAtMinQuota.clear();` logic and just keep two 
expected numbers:
   
   1) expectedNumMembersWithMaxQuota = totalPartitionsCount % numberOfConsumers;
   2) expectedNumMembersWithMinQuota = numberOfConsumers - 
expectedNumMembersWithMaxQuota;
   
   And then we can also remove the check in line 309, and at the end when we 
exhausted `unassignedPartitions`, just check
   
   ```
   numMembersWithMaxQuota == expectedNumMembersWithMaxQuota &&
   numMembersWithMinQuota == expectedNumMembersWithMinQuota
   ```
   
   WDYT?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
         // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
         for (TopicPartition unassignedPartition : unassignedPartitions) {
-            if (!unfilledConsumerIter.hasNext()) {
-                if (unfilledMembers.isEmpty()) {
-                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
-                    // There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+            String consumer;
+            if (unfilledConsumerIter.hasNext()) {
+                consumer = unfilledConsumerIter.next();
+            } else {
+                if (unfilledMembers.isEmpty() && 
potentiallyUnfilledMembersAtMinQuota.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer.
+                    // This indicates issues in the assignment algorithm
                     int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
                     log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
-                        unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
+                              
unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
                     throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
+                } else if (unfilledMembers.isEmpty()) {
+                    consumer = potentiallyUnfilledMembersAtMinQuota.poll();
+                } else {
+                    unfilledConsumerIter = unfilledMembers.iterator();
+                    consumer = unfilledConsumerIter.next();
                 }
-                unfilledConsumerIter = unfilledMembers.iterator();
             }
-            String consumer = unfilledConsumerIter.next();
+
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
             consumerAssignment.add(unassignedPartition);
 
             // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
-            if (allRevokedPartitions.contains(unassignedPartition))
+            // or else the partition was actually claimed by multiple previous 
owners and had to be invalidated from all
+            // members claimed ownedPartitions
+            if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
                 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
-            if (currentAssignedCount == expectedAssignedCount) {
-                if (currentAssignedCount == maxQuota) {
-                    numMembersAssignedOverMinQuota++;
-                }
+            if (currentAssignedCount == minQuota) {
                 unfilledConsumerIter.remove();
+                potentiallyUnfilledMembersAtMinQuota.add(consumer);
+            } else if (currentAssignedCount == maxQuota) {
+                numMembersAssignedOverMinQuota++;
+                if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+                    // We only start to iterate over the "potentially 
unfilled" members at minQuota after we've filled
+                    // all members up to at least minQuota, so once the last 
minQuota member reaches maxQuota, we
+                    // should be done. But in case of some algorithmic error, 
just log a warning and continue to
+                    // assign any remaining partitions within the assignment 
constraints
+                    if (unassignedPartitions.indexOf(unassignedPartition) != 
unassignedPartitions.size() - 1) {
+                        log.warn("Filled the last member up to maxQuota but 
still had partitions remaining to assign, "

Review comment:
       Related to the one above: maybe we just check that 
   
   ```
   numMembersWithMaxQuota == expectedNumMembersWithMaxQuota &&
   numMembersWithMinQuota == expectedNumMembersWithMinQuota
   ```
   
   And if not, log the full assignment as an ERROR?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -121,7 +129,12 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
 
                 // If the current member's generation is higher, all the 
previously owned partitions are invalid
                 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-                    
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    allPreviousPartitionsToOwner.clear();

Review comment:
       If the current member's generation is available but `<` maxGeneration, 
should we also clear it from the `consumerToOwnedPartitions` map? I think the 
passed in `subscriptions` is not sorted by the generations right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to