[ https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377635#comment-17377635 ]
A. Sophie Blee-Goldman commented on KAFKA-12477: ------------------------------------------------ No, we already decided to postpone this work to 3.1 so we can focus on some related things. Moved the Fix Version to 3.1.0 > Smart rebalancing with dynamic protocol selection > ------------------------------------------------- > > Key: KAFKA-12477 > URL: https://issues.apache.org/jira/browse/KAFKA-12477 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: A. Sophie Blee-Goldman > Assignee: A. Sophie Blee-Goldman > Priority: Major > Fix For: 3.1.0 > > > Users who want to upgrade their applications and enable the COOPERATIVE > rebalancing protocol in their consumer apps are required to follow a double > rolling bounce upgrade path. The reason for this is laid out in the [Consumer > Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer] > section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing > protocol in its constructor based on the list of supported partition > assignors. The protocol is selected as the highest protocol that is commonly > supported by all assignors in the list, and never changes after that. > This is a bit unfortunate because it may end up using an older protocol even > after every member in the group has been updated to support the newer > protocol. After the first rolling bounce of the upgrade, all members will > have two assignors: "cooperative-sticky" and "range" (or > sticky/round-robin/etc). At this point the EAGER protocol will still be > selected due to the presence of the "range" assignor, but it's the > "cooperative-sticky" assignor that will ultimately be selected for use in > rebalances if that assignor is preferred (ie positioned first in the list). > The only reason for the second rolling bounce is to strip off the "range" > assignor and allow the upgraded members to switch over to COOPERATIVE. We > can't allow them to use cooperative rebalancing until everyone has been > upgraded, but once they have it's safe to do so. > And there is already a way for the client to detect that everyone is on the > new byte code: if the CooperativeStickyAssignor is selected by the group > coordinator, then that means it is supported by all consumers in the group > and therefore everyone must be upgraded. > We may be able to save the second rolling bounce by dynamically updating the > rebalancing protocol inside the ConsumerCoordinator as "the highest protocol > supported by the assignor chosen by the group coordinator". This means we'll > still be using EAGER at the first rebalance, since we of course need to wait > for this initial rebalance to get the response from the group coordinator. > But we should take the hint from the chosen assignor rather than dropping > this information on the floor and sticking with the original protocol. > Concrete Proposal: > This assumes we will change the default assignor to ["cooperative-sticky", > "range"] in KIP-726. It also acknowledges that users may attempt any kind of > upgrade without reading the docs, and so we need to put in safeguards against > data corruption rather than assume everyone will follow the safe upgrade path. > With this proposal, > 1) New applications on 3.0 will enable cooperative rebalancing by default > 2) Existing applications which don’t set an assignor can safely upgrade to > 3.0 using a single rolling bounce with no extra steps, and will automatically > transition to cooperative rebalancing > 3) Existing applications which do set an assignor that uses EAGER can > likewise upgrade their applications to COOPERATIVE with a single rolling > bounce > 4) Once on 3.0, applications can safely go back and forth between EAGER and > COOPERATIVE > 5) Applications can safely downgrade away from 3.0 > The high-level idea for dynamic protocol upgrades is that the group will > leverage the assignor selected by the group coordinator to determine when > it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the > group in case of rare events or user misconfiguration. The group coordinator > selects the most preferred assignor that’s supported by all members of the > group, so we know that all members will support COOPERATIVE once we receive > the “cooperative-sticky” assignor after a rebalance. At this point, each > member can upgrade their own protocol to COOPERATIVE. However, there may be > situations in which an EAGER member may join the group even after upgrading > to COOPERATIVE. For example, during a rolling upgrade if the last remaining > member on the old bytecode misses a rebalance, the other members will be > allowed to upgrade to COOPERATIVE. If the old member rejoins and is chosen to > be the group leader before it’s upgraded to 3.0, it won’t be aware that the > other members of the group have not yet revoked their partitions when > computing the assignment. > Short Circuit: > The risk of mixing the cooperative and eager rebalancing protocols is that a > partition may be assigned to one member while it has yet to be revoked from > its previous owner. The danger is that the new owner may begin processing and > committing offsets for this partition while the previous owner is also > committing offsets in its #onPartitionsRevoked callback, which is invoked at > the end of the rebalance in the cooperative protocol. This can result in > these consumers overwriting each other’s offsets and getting a corrupted view > of the partition. Note that it’s not possible to commit during a rebalance, > so we can protect against offset corruption by blocking further commits after > we detect that the group leader may not understand COOPERATIVE, but before we > invoke #onPartitionsRevoked. This is the “short-circuit” — if we detect that > the group is in an unsafe state, we invoke #onPartitionsLost instead of > #onPartitionsRevoked and explicitly prevent offsets from being committed on > those revoked partitions. > Consumer procedure: > Upon startup, the consumer will initially select the highest > commonly-supported protocol across its configured assignors. With > ["cooperative-sticky", "range”], the initial protocol will be EAGER when the > member first joins the group. Following a rebalance, each member will check > the selected assignor. If the chosen assignor supports COOPERATIVE, the > member can upgrade their used protocol to COOPERATIVE and no further action > is required. If the member is already on COOPERATIVE but the selected > assignor does NOT support it, then we need to trigger the short-circuit. In > this case we will invoke #onPartitionsLost instead of #onPartitionsRevoked, > and set a flag to block any attempts at committing those partitions which > have been revoked. If a commit is attempted, as may be the case if the user > does not implement #onPartitionsLost (see KAFKA-12638), we will throw a > CommitFailedException which will be bubbled up through poll() after > completing the rebalance. The member will then downgrade its protocol to > EAGER for the next rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)