[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r669157613 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions numMembersAssignedOverMinQuota++; +if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { +potentiallyUnfilledMembersAtMinQuota.clear(); Review comment: Thanks @showuon and @guozhangwang , I think that all makes sense. One of my primary motivations was to keep all data structures at all times consistent with what they represent so they could always be relied upon to be used at any point. For that reason I also prefer to keep things as is, and clear the `potentiallyUnfilledMembersAtMinQuota` (now renamed to `nfilledMembersWithExactlyMinQuotaPartitions`) as soon as we have filled the last member who may be above `minQuota`, at which point all of the members at exactly `minQuota` are no longer considered "unfilled" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668410517 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set allTopics, Iterator unfilledConsumerIter = unfilledMembers.iterator(); // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { -if (!unfilledConsumerIter.hasNext()) { -if (unfilledMembers.isEmpty()) { -// Should not enter here since we have calculated the exact number to assign to each consumer -// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. +String consumer; +if (unfilledConsumerIter.hasNext()) { +consumer = unfilledConsumerIter.next(); +} else { +if (unfilledMembers.isEmpty() && potentiallyUnfilledMembersAtMinQuota.isEmpty()) { +// Should not enter here since we have calculated the exact number to assign to each consumer. +// This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", -unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); + unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); +} else if (unfilledMembers.isEmpty()) { +consumer = potentiallyUnfilledMembersAtMinQuota.poll(); +} else { +unfilledConsumerIter = unfilledMembers.iterator(); +consumer = unfilledConsumerIter.next(); } -unfilledConsumerIter = unfilledMembers.iterator(); } -String consumer = unfilledConsumerIter.next(); + List consumerAssignment = assignment.get(consumer); consumerAssignment.add(unassignedPartition); // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer -if (allRevokedPartitions.contains(unassignedPartition)) +// or else the partition was actually claimed by multiple previous owners and had to be invalidated from all +// members claimed ownedPartitions +if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, consumer); int currentAssignedCount = consumerAssignment.size(); -int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota; -if (currentAssignedCount == expectedAssignedCount) { -if (currentAssignedCount == maxQuota) { -numMembersAssignedOverMinQuota++; -} +if (currentAssignedCount == minQuota) { unfilledConsumerIter.remove(); +potentiallyUnfilledMembersAtMinQuota.add(consumer); +} else if (currentAssignedCount == maxQuota) { +numMembersAssignedOverMinQuota++; +if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { +// We only start to iterate over the "potentially unfilled" members at minQuota after we've filled +// all members up to at least minQuota, so once the last minQuota member reaches maxQuota, we +// should be done. But in case of some algorithmic error, just log a warning and continue to +// assign any remaining partitions within the assignment constraints +if (unassignedPartitions.indexOf(unassignedPartition) != unassignedPartitions.size() - 1) { +log.warn("Filled the last member up to maxQuota but still had partitions remaining to assign, " Review comment: I responded to the above comment as well, but specifically here I think that to just check on that condition requires us to make assumptions about the algorithm's correctness up to this point (and the correctness of its assumptions). But if those are all correct then we would never reach this to begin with, so it's better to directly look for any remaining `unassignedPartitions` -- it's a san
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668409463 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions numMembersAssignedOverMinQuota++; +if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { +potentiallyUnfilledMembersAtMinQuota.clear(); Review comment: While I'm not really a fan of the `potentiallyUnfilledMembersAtMinQuota` logic (it's definitely awkward but I felt it was still the lesser evil in terms of complicating the code), I don't think we can get rid of it that easily. The problem is that when `minQuota != maxQuota`, and so far `currentNumMembersWithOverMinQuotaPartitions` < `expectedNumMembersWithOverMinQuotaPartitions`, then consumers that are filled up to exactly `minQuota` have to be considered potentially not yet at capacity since some will need one more partition, though not all. So this data structure is not just used to verify that everything is properly assigned after we've exhausted the `unassignedPartitions`, it's used to track which consumers can still receive another partition (ie, are "unfilled"). Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: For the 3 & 4th suggested renamings, it's a bit subtle but this would actually be incorrect. In the case `minQuota == maxQuota`, the `expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of course we could go through a tweak the logic for this case, but I'd prefer not to mix that into this PR. For now I'll just clarify in the comments for these variables. (I did still rename them slightly to hopefully be more clear, and also in line with the new names of the other two data structures we renamed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: For the 3 & 4th suggested renamings, it's a bit subtle but this would actually be incorrect. In the case `minQuota == maxQuota`, the `expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of course we could go through a tweak the logic for this case, but I'd prefer not to mix that into this PR. For now I'll just clarify in the comments for these variables -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668406080 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: Ack on the first two renamings, though I'd still want to prefix them with `unfilled` to emphasize that these structures only hold members that may potentially be assigned one or more partitions. ie, if `minQuota == maxQuota`, then `potentiallyUnfilledMembersAtMinQuota` should actually be empty, in which case `MembersWithExactMinQuotaPartitions` doesn't quite make sense. I'll clarify this in the comments as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668402587 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { -ownedPartitions.add(tp); + +if (!allPreviousPartitionsToOwner.containsKey(tp)) { +allPreviousPartitionsToOwner.put(tp, consumer); +ownedPartitions.add(tp); +} else { +String otherConsumer = allPreviousPartitionsToOwner.get(tp); +log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " Review comment: Good point, yes I would absolutely want/hope a user would report this as a bug. Changed to ERROR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r668402323 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -121,7 +129,12 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { - membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); +allPreviousPartitionsToOwner.clear(); Review comment: In that case, it's never added to `consumerToOwnedPartitions` in the first place. This map is not pre-filled, it gets populated inside this loop. So if its `< maxGeneration`, then we just insert an empty list into the map for that member's owned partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665794341 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set allTopics, Iterator unfilledConsumerIter = unfilledMembers.iterator(); // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { -if (!unfilledConsumerIter.hasNext()) { -if (unfilledMembers.isEmpty()) { -// Should not enter here since we have calculated the exact number to assign to each consumer -// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. +String consumer; +if (unfilledConsumerIter.hasNext()) { +consumer = unfilledConsumerIter.next(); +} else { +if (unfilledMembers.isEmpty() && potentiallyUnfilledMembersAtMinQuota.isEmpty()) { +// Should not enter here since we have calculated the exact number to assign to each consumer. +// This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", -unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); + unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); +} else if (unfilledMembers.isEmpty()) { +consumer = potentiallyUnfilledMembersAtMinQuota.poll(); +} else { +unfilledConsumerIter = unfilledMembers.iterator(); +consumer = unfilledConsumerIter.next(); } -unfilledConsumerIter = unfilledMembers.iterator(); } -String consumer = unfilledConsumerIter.next(); + List consumerAssignment = assignment.get(consumer); consumerAssignment.add(unassignedPartition); // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer -if (allRevokedPartitions.contains(unassignedPartition)) +// or else the partition was actually claimed by multiple previous owners and had to be invalidated from all +// members claimed ownedPartitions +if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, consumer); Review comment: That's a good idea. But WDYT about doing this in a small followup PR? Partly to keep this PR from growing too long (it's already much longer than I originally intended), and partly so we can get this one merged ASAP since it's a blocker. Actually, would you be interested in working on that test (or tests) yourself, after this PR can be merged? At this point I think you're probably the most familiar with this code outside of myself -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665767593 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + +allPreviousPartitionsToOwner.clear(); +partitionsWithMultiplePreviousOwners.clear(); +for (String droppedOutConsumer : membersWithOldGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); +} Review comment: Mm good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665750661 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java ## @@ -53,9 +56,34 @@ public String name() { return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER); } +@Override +public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { +this.generation = metadata.generationId(); +} + +@Override +public ByteBuffer subscriptionUserData(Set topics) { +if (generation == DEFAULT_GENERATION) { Review comment: Yeahh...will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665240093 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java ## @@ -555,7 +558,7 @@ public void testLargeAssignmentAndGroupWithUniformSubscription() { assignor.assign(partitionsPerTopic, subscriptions); } -@Timeout(40) +@Timeout(60) Review comment: Saw this fail sometimes when running locally. This PR definitely should not have significantly affected the performance of this test in any way, since the only substantial changes are in the constrained assignment algorithm, while this test targets the general case of unequal subscriptions. It's not too surprising to me that this would be running up against the time limit anyways, since we've known that the general assignment algorithm is complicated and doesn't scale too well.So I felt it was reasonable to give it a bit more time. Alternatively we could just scale back a bit with the number of partitions and/or consumers. WDYT @showuon? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665031448 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: This is part of fix #3 -- basically we have to handle these members separately, once they get up to `minQuota` they are only "potentially" unfilled. Once the last member allowed reaches `maxQuota`, all of these `minQuota` members are suddenly considered filled. cc @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665030959 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -186,16 +212,25 @@ private boolean allSubscriptionsEqual(Set allTopics, consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; List assignedPartitions = new ArrayList<>(); -// Reassign previously owned partitions to the expected number +// Reassign previously owned partitions, up to the expected number of partitions per consumer for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); +for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { +if (ownedPartitions.contains(doublyClaimedPartition)) { +log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" Review comment: Strictly speaking this should never ever happen, even if we do get these "impossible" doubly-claimed partitions, we're also removing them from the `ownedPartitions` above (that's fix #2). But I put in a safeguard just in case, it shouldn't hurt (performance-wise we should generally not even enter this loop since `partitionsWithMultiplePreviousOwners` should almost always be empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665030173 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { -ownedPartitions.add(tp); + +if (!allPreviousPartitionsToOwner.containsKey(tp)) { +allPreviousPartitionsToOwner.put(tp, consumer); +ownedPartitions.add(tp); +} else { +String otherConsumer = allPreviousPartitionsToOwner.get(tp); +log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " Review comment: This is fix #2 -- if we somehow still get multiple consumers claiming a partition in the same generation, we have to consider both invalid and remove it from their `ownedPartitions` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665029950 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + +allPreviousPartitionsToOwner.clear(); +partitionsWithMultiplePreviousOwners.clear(); +for (String droppedOutConsumer : membersWithOldGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); +} Review comment: This part I just moved here to keep things up to date as we go, before we were clearing them after the loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org