[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-13 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r669157613



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
 numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
   Thanks @showuon and @guozhangwang , I think that all makes sense. One of 
my primary motivations was to keep all data structures at all times consistent 
with what they represent so they could always be relied upon to be used at any 
point. For that reason I also prefer to keep things as is, and clear the 
`potentiallyUnfilledMembersAtMinQuota` (now renamed to 
`nfilledMembersWithExactlyMinQuotaPartitions`) as soon as we have filled the 
last member who may be above `minQuota`, at which point all of the members at 
exactly `minQuota` are no longer considered "unfilled"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668410517



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 Iterator unfilledConsumerIter = unfilledMembers.iterator();
 // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
 for (TopicPartition unassignedPartition : unassignedPartitions) {
-if (!unfilledConsumerIter.hasNext()) {
-if (unfilledMembers.isEmpty()) {
-// Should not enter here since we have calculated the 
exact number to assign to each consumer
-// There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+String consumer;
+if (unfilledConsumerIter.hasNext()) {
+consumer = unfilledConsumerIter.next();
+} else {
+if (unfilledMembers.isEmpty() && 
potentiallyUnfilledMembersAtMinQuota.isEmpty()) {
+// Should not enter here since we have calculated the 
exact number to assign to each consumer.
+// This indicates issues in the assignment algorithm
 int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
 log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
-unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
+  
unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
 throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
+} else if (unfilledMembers.isEmpty()) {
+consumer = potentiallyUnfilledMembersAtMinQuota.poll();
+} else {
+unfilledConsumerIter = unfilledMembers.iterator();
+consumer = unfilledConsumerIter.next();
 }
-unfilledConsumerIter = unfilledMembers.iterator();
 }
-String consumer = unfilledConsumerIter.next();
+
 List consumerAssignment = assignment.get(consumer);
 consumerAssignment.add(unassignedPartition);
 
 // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
-if (allRevokedPartitions.contains(unassignedPartition))
+// or else the partition was actually claimed by multiple previous 
owners and had to be invalidated from all
+// members claimed ownedPartitions
+if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
 
 int currentAssignedCount = consumerAssignment.size();
-int expectedAssignedCount = numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
-if (currentAssignedCount == expectedAssignedCount) {
-if (currentAssignedCount == maxQuota) {
-numMembersAssignedOverMinQuota++;
-}
+if (currentAssignedCount == minQuota) {
 unfilledConsumerIter.remove();
+potentiallyUnfilledMembersAtMinQuota.add(consumer);
+} else if (currentAssignedCount == maxQuota) {
+numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+// We only start to iterate over the "potentially 
unfilled" members at minQuota after we've filled
+// all members up to at least minQuota, so once the last 
minQuota member reaches maxQuota, we
+// should be done. But in case of some algorithmic error, 
just log a warning and continue to
+// assign any remaining partitions within the assignment 
constraints
+if (unassignedPartitions.indexOf(unassignedPartition) != 
unassignedPartitions.size() - 1) {
+log.warn("Filled the last member up to maxQuota but 
still had partitions remaining to assign, "

Review comment:
   I responded to the above comment as well, but specifically here I think 
that to just check on that condition requires us to make assumptions about the 
algorithm's correctness up to this point (and the correctness of its 
assumptions). But if those are all correct then we would never reach this to 
begin with, so it's better to directly look for any remaining 
`unassignedPartitions` -- it's a san

[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668409463



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -205,6 +237,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
 numMembersAssignedOverMinQuota++;
+if (numMembersAssignedOverMinQuota == 
expectedNumMembersAssignedOverMinQuota) {
+potentiallyUnfilledMembersAtMinQuota.clear();

Review comment:
   While I'm not really a fan of the `potentiallyUnfilledMembersAtMinQuota` 
logic (it's definitely awkward but I felt it was still the lesser evil in terms 
of complicating the code), I don't think we can get rid of it that easily. The 
problem is that when `minQuota != maxQuota`, and so far 
`currentNumMembersWithOverMinQuotaPartitions` < 
`expectedNumMembersWithOverMinQuotaPartitions`, then consumers that are filled 
up to exactly `minQuota` have to be considered potentially not yet at capacity 
since some will need one more partition, though not all. So this data structure 
is not just used to verify that everything is properly assigned after we've 
exhausted the `unassignedPartitions`, it's used to track which consumers can 
still receive another partition (ie, are "unfilled"). Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   For the 3 & 4th suggested renamings, it's a bit subtle but this would 
actually be incorrect. In the case `minQuota == maxQuota`, the 
`expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which 
would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of 
course we could go through a tweak the logic for this case, but I'd prefer not 
to mix that into this PR. For now I'll just clarify in the comments for these 
variables.
   (I did still rename them slightly to hopefully be more clear, and also in 
line with the new names of the other two data structures we renamed)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668406999



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   For the 3 & 4th suggested renamings, it's a bit subtle but this would 
actually be incorrect. In the case `minQuota == maxQuota`, the 
`expectedNumMembersAssignedOverMinQuota` variable will evaluate to 0, which 
would not make sense if it was called `expectedNumMembersWithMaxQuota`. Of 
course we could go through a tweak the logic for this case, but I'd prefer not 
to mix that into this PR. For now I'll just clarify in the comments for these 
variables




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668406080



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   Ack on the first two renamings, though I'd still want to prefix them 
with `unfilled` to emphasize that these structures only hold members that may 
potentially be assigned one or more partitions. ie, if `minQuota == maxQuota`, 
then `potentiallyUnfilledMembersAtMinQuota` should actually be empty, in which 
case `MembersWithExactMinQuotaPartitions` doesn't quite make sense. I'll 
clarify this in the comments as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668402587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (final TopicPartition tp : memberData.partitions) {
 // filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
-ownedPartitions.add(tp);
+
+if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+allPreviousPartitionsToOwner.put(tp, consumer);
+ownedPartitions.add(tp);
+} else {
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "

Review comment:
   Good point, yes I would absolutely want/hope a user would report this as 
a bug. Changed to ERROR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-12 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r668402323



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -121,7 +129,12 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 
 // If the current member's generation is higher, all the 
previously owned partitions are invalid
 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+allPreviousPartitionsToOwner.clear();

Review comment:
   In that case, it's never added to `consumerToOwnedPartitions` in the 
first place. This map is not pre-filled, it gets populated inside this loop. So 
if its `< maxGeneration`, then we just insert an empty list into the map for 
that member's owned partitions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-07 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665794341



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -238,32 +272,50 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 Iterator unfilledConsumerIter = unfilledMembers.iterator();
 // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
 for (TopicPartition unassignedPartition : unassignedPartitions) {
-if (!unfilledConsumerIter.hasNext()) {
-if (unfilledMembers.isEmpty()) {
-// Should not enter here since we have calculated the 
exact number to assign to each consumer
-// There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+String consumer;
+if (unfilledConsumerIter.hasNext()) {
+consumer = unfilledConsumerIter.next();
+} else {
+if (unfilledMembers.isEmpty() && 
potentiallyUnfilledMembersAtMinQuota.isEmpty()) {
+// Should not enter here since we have calculated the 
exact number to assign to each consumer.
+// This indicates issues in the assignment algorithm
 int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
 log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
-unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
+  
unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
 throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
+} else if (unfilledMembers.isEmpty()) {
+consumer = potentiallyUnfilledMembersAtMinQuota.poll();
+} else {
+unfilledConsumerIter = unfilledMembers.iterator();
+consumer = unfilledConsumerIter.next();
 }
-unfilledConsumerIter = unfilledMembers.iterator();
 }
-String consumer = unfilledConsumerIter.next();
+
 List consumerAssignment = assignment.get(consumer);
 consumerAssignment.add(unassignedPartition);
 
 // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
-if (allRevokedPartitions.contains(unassignedPartition))
+// or else the partition was actually claimed by multiple previous 
owners and had to be invalidated from all
+// members claimed ownedPartitions
+if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);

Review comment:
   That's a good idea. But WDYT about doing this in a small followup PR? 
Partly to keep this PR from growing too long (it's already much longer than I 
originally intended), and partly so we can get this one merged ASAP since it's 
a blocker.
   
   Actually, would you be interested in working on that test (or tests) 
yourself, after this PR can be merged? At this point I think you're probably 
the most familiar with this code outside of myself




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-07 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665767593



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 // If the current member's generation is higher, all the 
previously owned partitions are invalid
 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
 
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+
+allPreviousPartitionsToOwner.clear();
+partitionsWithMultiplePreviousOwners.clear();
+for (String droppedOutConsumer : membersWithOldGeneration) 
{
+
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+}

Review comment:
   Mm good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-07 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665750661



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
##
@@ -53,9 +56,34 @@ public String name() {
 return Arrays.asList(RebalanceProtocol.COOPERATIVE, 
RebalanceProtocol.EAGER);
 }
 
+@Override
+public void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
+this.generation = metadata.generationId();
+}
+
+@Override
+public ByteBuffer subscriptionUserData(Set topics) {
+if (generation == DEFAULT_GENERATION) {

Review comment:
   Yeahh...will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-07 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665240093



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##
@@ -555,7 +558,7 @@ public void 
testLargeAssignmentAndGroupWithUniformSubscription() {
 assignor.assign(partitionsPerTopic, subscriptions);
 }
 
-@Timeout(40)
+@Timeout(60)

Review comment:
   Saw this fail sometimes when running locally. This PR definitely should 
not have significantly affected the performance of this test in any way, since 
the only substantial changes are in the constrained assignment algorithm, while 
this test targets the general case of unequal subscriptions. It's not too 
surprising to me that this would be running up against the time limit anyways, 
since we've known that the general assignment algorithm is complicated and 
doesn't scale too well.So I felt it was reasonable to give it a bit more time. 
   
   Alternatively we could just scale back a bit with the number of partitions 
and/or consumers. WDYT @showuon?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665031448



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   This is part of fix #3 -- basically we have to handle these members 
separately, once they get up to `minQuota` they are only "potentially" 
unfilled. Once the last member allowed reaches `maxQuota`, all of these 
`minQuota` members are suddenly considered filled. cc @showuon 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665030959



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -186,16 +212,25 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
 List assignedPartitions = new ArrayList<>();
-// Reassign previously owned partitions to the expected number
+// Reassign previously owned partitions, up to the expected number of 
partitions per consumer
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
 
+for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+if (ownedPartitions.contains(doublyClaimedPartition)) {
+log.warn("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple"

Review comment:
   Strictly speaking this should never ever happen, even if we do get these 
"impossible" doubly-claimed partitions, we're also removing them from the 
`ownedPartitions` above (that's fix #2). But I put in a safeguard just in case, 
it shouldn't hurt (performance-wise we should generally not even enter this 
loop since `partitionsWithMultiplePreviousOwners` should almost always be empty




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665030173



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (final TopicPartition tp : memberData.partitions) {
 // filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
-ownedPartitions.add(tp);
+
+if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+allPreviousPartitionsToOwner.put(tp, consumer);
+ownedPartitions.add(tp);
+} else {
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "

Review comment:
   This is fix #2 -- if we somehow still get multiple consumers claiming a 
partition in the same generation, we have to consider both invalid and remove 
it from their `ownedPartitions`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665029950



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 // If the current member's generation is higher, all the 
previously owned partitions are invalid
 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
 
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+
+allPreviousPartitionsToOwner.clear();
+partitionsWithMultiplePreviousOwners.clear();
+for (String droppedOutConsumer : membersWithOldGeneration) 
{
+
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+}

Review comment:
   This part I just moved here to keep things up to date as we go, before 
we were clearing them after the loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org