[ 
https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377635#comment-17377635
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12477:
------------------------------------------------

No, we already decided to postpone this work to 3.1 so we can focus on some 
related things. Moved the Fix Version to 3.1.0

> Smart rebalancing with dynamic protocol selection
> -------------------------------------------------
>
>                 Key: KAFKA-12477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12477
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 3.1.0
>
>
> Users who want to upgrade their applications and enable the COOPERATIVE 
> rebalancing protocol in their consumer apps are required to follow a double 
> rolling bounce upgrade path. The reason for this is laid out in the [Consumer 
> Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
>  section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing 
> protocol in its constructor based on the list of supported partition 
> assignors. The protocol is selected as the highest protocol that is commonly 
> supported by all assignors in the list, and never changes after that.
> This is a bit unfortunate because it may end up using an older protocol even 
> after every member in the group has been updated to support the newer 
> protocol. After the first rolling bounce of the upgrade, all members will 
> have two assignors: "cooperative-sticky" and "range" (or 
> sticky/round-robin/etc). At this point the EAGER protocol will still be 
> selected due to the presence of the "range" assignor, but it's the 
> "cooperative-sticky" assignor that will ultimately be selected for use in 
> rebalances if that assignor is preferred (ie positioned first in the list). 
> The only reason for the second rolling bounce is to strip off the "range" 
> assignor and allow the upgraded members to switch over to COOPERATIVE. We 
> can't allow them to use cooperative rebalancing until everyone has been 
> upgraded, but once they have it's safe to do so.
> And there is already a way for the client to detect that everyone is on the 
> new byte code: if the CooperativeStickyAssignor is selected by the group 
> coordinator, then that means it is supported by all consumers in the group 
> and therefore everyone must be upgraded. 
> We may be able to save the second rolling bounce by dynamically updating the 
> rebalancing protocol inside the ConsumerCoordinator as "the highest protocol 
> supported by the assignor chosen by the group coordinator". This means we'll 
> still be using EAGER at the first rebalance, since we of course need to wait 
> for this initial rebalance to get the response from the group coordinator. 
> But we should take the hint from the chosen assignor rather than dropping 
> this information on the floor and sticking with the original protocol.
> Concrete Proposal:
> This assumes we will change the default assignor to ["cooperative-sticky", 
> "range"] in KIP-726. It also acknowledges that users may attempt any kind of 
> upgrade without reading the docs, and so we need to put in safeguards against 
> data corruption rather than assume everyone will follow the safe upgrade path.
> With this proposal, 
> 1) New applications on 3.0 will enable cooperative rebalancing by default
> 2) Existing applications which don’t set an assignor can safely upgrade to 
> 3.0 using a single rolling bounce with no extra steps, and will automatically 
> transition to cooperative rebalancing
> 3) Existing applications which do set an assignor that uses EAGER can 
> likewise upgrade their applications to COOPERATIVE with a single rolling 
> bounce
> 4) Once on 3.0, applications can safely go back and forth between EAGER and 
> COOPERATIVE
> 5) Applications can safely downgrade away from 3.0
> The high-level idea for dynamic protocol upgrades is that the group will 
> leverage the assignor selected by the group coordinator to determine when 
> it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the 
> group in case of rare events or user misconfiguration. The group coordinator 
> selects the most preferred assignor that’s supported by all members of the 
> group, so we know that all members will support COOPERATIVE once we receive 
> the “cooperative-sticky” assignor after a rebalance. At this point, each 
> member can upgrade their own protocol to COOPERATIVE. However, there may be 
> situations in which an EAGER member may join the group even after upgrading 
> to COOPERATIVE. For example, during a rolling upgrade if the last remaining 
> member on the old bytecode misses a rebalance, the other members will be 
> allowed to upgrade to COOPERATIVE. If the old member rejoins and is chosen to 
> be the group leader before it’s upgraded to 3.0, it won’t be aware that the 
> other members of the group have not yet revoked their partitions when 
> computing the assignment.
> Short Circuit:
> The risk of mixing the cooperative and eager rebalancing protocols is that a 
> partition may be assigned to one member while it has yet to be revoked from 
> its previous owner. The danger is that the new owner may begin processing and 
> committing offsets for this partition while the previous owner is also 
> committing offsets in its #onPartitionsRevoked callback, which is invoked at 
> the end of the rebalance in the cooperative protocol. This can result in 
> these consumers overwriting each other’s offsets and getting a corrupted view 
> of the partition. Note that it’s not possible to commit during a rebalance, 
> so we can protect against offset corruption by blocking further commits after 
> we detect that the group leader may not understand COOPERATIVE, but before we 
> invoke #onPartitionsRevoked. This is the “short-circuit” — if we detect that 
> the group is in an unsafe state, we invoke #onPartitionsLost instead of 
> #onPartitionsRevoked and explicitly prevent offsets from being committed on 
> those revoked partitions.
> Consumer procedure:
> Upon startup, the consumer will initially select the highest 
> commonly-supported protocol across its configured assignors. With 
> ["cooperative-sticky", "range”], the initial protocol will be EAGER when the 
> member first joins the group. Following a rebalance, each member will check 
> the selected assignor. If the chosen assignor supports COOPERATIVE, the 
> member can upgrade their used protocol to COOPERATIVE and no further action 
> is required. If the member is already on COOPERATIVE but the selected 
> assignor does NOT support it, then we need to trigger the short-circuit. In 
> this case we will invoke #onPartitionsLost instead of #onPartitionsRevoked, 
> and set a flag to block any attempts at committing those partitions which 
> have been revoked. If a commit is attempted, as may be the case if the user 
> does not implement #onPartitionsLost (see KAFKA-12638), we will throw a 
> CommitFailedException which will be bubbled up through poll() after 
> completing the rebalance. The member will then downgrade its protocol to 
> EAGER for the next rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to