[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719570#comment-17719570 ] A. Sophie Blee-Goldman commented on KAFKA-14016: Nice job [~pnee] – thanks for taking the time to patch this finally. To clarify, since there are a lot of tickets and PRs floating around here, the assignor fix is in [pull/13550|https://github.com/apache/kafka/pull/13550] > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Labels: new-rebalance-should-fix > Fix For: 3.5.0, 3.4.1 > > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704843#comment-17704843 ] tony mancill commented on KAFKA-14016: -- Yes - thank you for the redirect [~pnee] ! > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704833#comment-17704833 ] Philip Nee commented on KAFKA-14016: Hey [~showuon] - It doesn't seem like the issue was resolved, but instead, the issue appears sporadically, so it didn't get a follow-up. I see [~aiquestion] and [~ableegoldman] previously mentioned that we could patch the assignor logic to also account for the partition owner of the latest generation. Is it something we intend to do? > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704819#comment-17704819 ] Philip Nee commented on KAFKA-14016: Actually [~tmancill] - I think this is similar to your problem? https://issues.apache.org/jira/browse/KAFKA-14639 > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704805#comment-17704805 ] Philip Nee commented on KAFKA-14016: Hey [~tmancill] - Thanks for chipping in, I think what you are seeing here is a different issue. In your example, the consumers joining with a different generation are still holding onto some partitions, and therefore causes these partitions getting assigned and revoked within a single rebalance cycle. I think normally, the consumer with the younger generation would revoke these partitions prior to joining - see the onPartitionsLost. It appears to be a different issue than this one so... could you open a separate ticket with some client log? I think it might be hard to debug based on your example. Thanks! > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704322#comment-17704322 ] tony mancill commented on KAFKA-14016: -- We are seeing this issue with 3.3.2. Quoting from one of our engineers (who doesn't yet have a Jira account here): {quote}Our goal is to ensure that {{onPartitionsRevoked()}} happens before {{onPartitionsAssigned()}} for every partition. From our testing, in cases where partitions from younger generations are part of the rebalance, it is possible to violate this contract and a partition is reassigned in one rebalance cycle. {quote} I am attaching our repro case [^CooperativeStickyAssignorBugReproduction.java] > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703826#comment-17703826 ] Philip Nee commented on KAFKA-14016: Hmm, maybe it's a different issue. For 3.3.2 we are seeing partitions being assigned before revocation. I think that violates the contract. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703825#comment-17703825 ] Luke Chen commented on KAFKA-14016: --- It's been a while that I cannot remember it clearly. But it looks like this issue no longer existed after we revert the previous change. So I think we can close it. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703797#comment-17703797 ] Philip Nee commented on KAFKA-14016: Hey all, in particular, [~showuon] [~aiquestion] I've been trying to understand and watch this issue for a while - I'm curious, what exactly do we want to achieve here? Is it that we want to minimize the partition movement as much as possible? In the case when all partitions are correctly assigned. If one consumer joins with an earlier generation, is the desired behavior to have nothing revoked? (Assuming the partition assignment is already in a good state). I wrote a small test to demonstrate what I think we want: {code:java} public void testExpiredGenerationNoPartitionMovement() { Map partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 3); subscriptions.put(consumer1, buildSubscriptionV1(topics(topic), partitions(tp(topic, 0)), 2)); subscriptions.put(consumer2, buildSubscriptionV1(topics(topic), partitions(tp(topic, 1), tp(topic, 2)), 1)); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1)); assertEquals(partitions(tp(topic, 1), tp(topic, 2)), assignment.get(consumer2)); ... }{code} > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625551#comment-17625551 ] David Jacot commented on KAFKA-14016: - > The sticky assignment algorithm should account for this and IIRC will > basically consider whoever has the highest valid generation for a partition > as its previous owner I think that it does not work like this at the moment but it should. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625442#comment-17625442 ] Shawn Wang commented on KAFKA-14016: [~ableegoldman] Okay~ Submitted one : https://github.com/apache/kafka/pull/12794 > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625360#comment-17625360 ] A. Sophie Blee-Goldman commented on KAFKA-14016: [~aiquestion] would you be interested in submitting a PR for reverting KAFKA-13891? It still needs a PR but Luke or I can merge it as soon as the PR build finishes running. In the meantime I'll look into the assignor's handling of this kind of thing and submit a patch if we're missing the logic for this (which from your report, it sounds like we are) > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625192#comment-17625192 ] Shawn Wang commented on KAFKA-14016: I also vote for revertingKAFKA-13891 since that previous state in 3 happens randomly and doesn't have a big impact. But i don't think [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] can solve this problem. It moved generationId from userData to ConsumerProtocolSubscription and don't change the logic of build-in CooperativeStickyAssignor. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625187#comment-17625187 ] Shawn Wang commented on KAFKA-14016: [~ableegoldman] Yes, we experienced KAFKA-13891 Here is the full history of our case: We have several consumer group which has more than 2000 consumers. And we are using Kafka Broker 2.3 && Kakfa Client 2.5. We enabled StaticMembership && Coopearative Rebalance to avoid STW time in rebalance. # we found that in some cases the partition will be duplicated assigned, so we patched KAFKA-12984 , KAFKA-12983, KAFKA-13406 # after we deployed online, we found that some consumer group will rebalance for a long time ( 2 hours) before it finally get Stable, so we then patched KAFKA-13891. # after deployed, we experienced more partition lag when rebalance happens. Then i created this issue and try to get some advise. # Actually we workaround it by 'ignore the generation value when leader calculate assignment' ( just set every memeber's generation to unknonw ). And after we go online for more than 2 months, it looks good now. For "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner", i think in the code does't implement in this way [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L127] Please kindly correct me if i'm wrong. In this code we clear all previous owned parittions if we got a higer geneartion, so only the ownedPartitions with highest generation will be valid I think "The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner" is also a fix for this. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625003#comment-17625003 ] Luke Chen commented on KAFKA-14016: --- [~ableegoldman] , thanks for the comment. Yes, we have the logic in sticky assignor to protect multiple consumer claiming the same partition in the same highest generation. The original thought is that the same logic didn't exist in custom assignor. But after [KIP-792|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336614] get implemented and merged (which should happen in next release v3.4.0), we should not worry about it anymore. Therefore, for this issue, I'd also vote for just reverting the changes. Thanks. > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624948#comment-17624948 ] A. Sophie Blee-Goldman commented on KAFKA-14016: Hey, I realize there's a lot of history leading up to this issue and the associated "fix", so forgive me for missing anything while I'm getting up to speed – but taking a step back, before we jump into the discussion about alternative fixes for KAFKA-13891 can we flesh out the actual underlying problem & take stock of what symptoms people have actually seen vs theorized about? Sorry for pushing back on this, it just seems like we've been playing whack-a-mole with these rebalancing issues lately, and the moles have started to whack us back. I just want to encourage us all to approach this carefully so we're not having the exact same conversation and reaching for yet more alternative fixes by the next release. [~aiquestion] I guess this question is mostly directed at you, as the original reporter of KAFKA-13891: were you able to reproduce or experienced this in a real application, or was this mainly filed to follow up on the suggestion for it in KAFKA-13419? Sorry to repeat a bit of the conversation over on KAFKA-13419, but for full context here I was curious about the actual symptoms of KAFKA-13891 vs the scenario outlined in that ticket. Specifically, I think we need to expand and/or elaborate on the effect of having an old (but valid) generation when the consumer is still the most recent owner to claim its given partitions. The sticky assignment algorithm should account for this and IIRC will basically consider whoever has the highest valid generation for a partition as its previous owner, which in this case would always be the consumer that received this REBALANCE_IN_PROGRESS error. To summarize, basically I'm not convinced that failing to reset the generation has any impact on the assignor in this case, ie that there is an actual bug/problem described in KAFKA-13891 – in fact, forcing the consumer to reset everything when a rebalance is restarted seems actively detrimental, as evidenced by this exact ticket. I would vote for just reverting the changes we made for that and call it a day until/unless we see concrete evidence/symptoms that are impacting a real application. [~showuon] / [~aiquestion] / [~guozhang] Thoughts? > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Priority: Major > Labels: new-rebalance-should-fix > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.