[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521996#comment-17521996
 ] 

Chris Egerton edited comment on KAFKA-12495 at 4/14/22 2:14 AM:
----------------------------------------------------------------

{quote}Allowing for consecutive revocations that happen immediately when an 
imbalance is detected might mean that the workers overreact to external 
circumstances that have caused an imbalanced between the initial calculation of 
task assignments of the revocation rebalance and the subsequent rebalance for 
the assignment of revoked tasks. Such circumstances might have to do with 
rolling upgrades, scaling a cluster up or down or simply might be caused by 
temporary instability.
{quote}
Have you identified a plausible case where this may be an issue? Load-balancing 
revocations are only necessary when the number of workers has increased (or 
when the number of connectors/tasks has decreased, although this is not 
addressed in the current rebalancing algorithm). At least with the case of new 
workers, is it really an overreaction to find connectors/tasks to allocate to 
them as soon as possible?

Going over the example cases provided:
 * With a rolling upgrade, the existing delayed rebalancing logic should 
already apply, preventing excessive revocations from taking place
 * With a cluster scale-down, no load-balancing revocations should be 
necessary, since the number of connectors/tasks per worker will increase, not 
decrease
 * With a cluster scale-up, immediate revocation will not only not be harmful, 
it will actually be advantageous as it will allow the new workers to begin work 
immediately instead of waiting for the scheduled rebalance delay to elapse. 
This could be crucial if there's a load burst across the cluster and an 
external auto-scaling process spins up new workers to try to respond as quickly 
as possible
 * With temporary instability, if workers fall out of the cluster, the existing 
delayed rebalancing logic should already apply, preventing excessive 
revocations from taking place. There may be another interpretation of what this 
scenario would look like in terms of workers leaving/joining the cluster; let 
me know if you had something else in mind

The only case I can think of where unconditionally delaying between revocations 
may be beneficial is if there's a rapid scale-up and then immediate scale-down 
of a cluster. If we hold off on revoking too many connectors/tasks from the 
pre-scale-up workers in the cluster, then we'll have to reassign fewer of them 
once the scale-down takes place. But unless I'm missing something, this is an 
unlikely edge case and should not be prioritized.
{quote}To shield ourselves from infinite such rebalances the leader should also 
keep track of how many such attempts have been made and stop attempting to 
balance out tasks after a certain number of tries. Of course every other normal 
rebalance should reset both this counter and possibly the delay.
{quote}
This is a great suggestion, especially since it can (and should) be implemented 
regardless of whether a delay is added between consecutive load-balancing 
revocations.


was (Author: chrisegerton):
{quote}Allowing for consecutive revocations that happen immediately when an 
imbalance is detected might mean that the workers overreact to external 
circumstances that have caused an imbalanced between the initial calculation of 
task assignments of the revocation rebalance and the subsequent rebalance for 
the assignment of revoked tasks. Such circumstances might have to do with 
rolling upgrades, scaling a cluster up or down or simply might be caused by 
temporary instability.
{quote}
Have you identified a plausible case where this may be an issue? Load-balancing 
revocations are only necessary when the number of workers has increased (or 
when the number of connectors/tasks has decreased, although this is not 
addressed in the current rebalancing algorithm). At least with the case of new 
workers, is it really an overreaction to find connectors/tasks to allocate to 
them as soon as possible?

Going over the example cases provided:
 * With a rolling upgrade, the existing delayed rebalancing logic should 
already apply, preventing excessive revocations from taking place
 * With a cluster scale-down, no load-balancing revocations should be 
necessary, since the number of connectors/tasks per worker will increase, not 
decrease
 * With a cluster scale-up, immediate revocation will not only not be harmful, 
it will actually be advantageous as it will allow the new workers to begin work 
immediately instead of waiting for the scheduled rebalance delay to elapse. 
This could be crucial if there's a load burst across the cluster and an 
external auto-scaling process spins up new workers to try to respond as quickly 
as possible
 * With temporary instability, if workers fall out of the cluster, the existing 
delayed rebalancing logic should already apply, preventing excessive 
revocations from taking place. There may be another interpretation of this 
scenario; let me know if you had something else in mind

The only case I can think of where unconditionally delaying between revocations 
may be beneficial is if there's a rapid scale-up and then immediate scale-down 
of a cluster. If we hold off on revoking too many connectors/tasks from the 
pre-scale-up workers in the cluster, then we'll have to reassign fewer of them 
once the scale-down takes place. But unless I'm missing something, this is an 
unlikely edge case and should not be prioritized.
{quote}To shield ourselves from infinite such rebalances the leader should also 
keep track of how many such attempts have been made and stop attempting to 
balance out tasks after a certain number of tries. Of course every other normal 
rebalance should reset both this counter and possibly the delay.
{quote}
This is a great suggestion, especially since it can (and should) be implemented 
regardless of whether a delay is added between consecutive load-balancing 
revocations.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12495
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Blocker
>             Fix For: 3.2.0
>
>         Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader 
> W1 computes and sends assignments:
> // (final) We assigned all the previous revoked Connectors/Tasks to the 
> members
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
> W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
> {code}
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.
>  
> Note2: this issue makes KAFKA-12283 test flaky.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to