[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12495:
------------------------------
    Description: 
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W4 added after 1st rebalance 
completed and before 2nd rebalance started? Let's see what will happened? Let's 
see this example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])

W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []

// one more member joined
W4 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, and 
didn't revoke any more C/T in this round, which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
W2(delay: 0, assigned: [BT4, BT5], revoked: [])
{code}
Because we didn't allow to do consecutive revoke in two consecutive rebalances, 
we will have this uneven distribution under this situation. We should allow 
consecutive rebalance to have another round of revocation to revoke the C/T to 
the other members in this case.

expected:
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])

W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []

// one more member joined
W4 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
**and also revoke some C/T** 
W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
W4(delay: 0, assigned: [BT4, BT5], revoked: [])

// another round of rebalance to assign the new revoked C/T to the other members
W1 rejoins with assignment: [AC0, AT1, AT2] 
Rebalance is triggered 
W2 joins with assignment: [AT4, AT5, BC0] 
W3 joins with assignment: [BT1, BT2, BT4]
W4 joins with assignment: [BT4, BT5]

W1 becomes leader 
W1 computes and sends assignments:

// (final) We assigned all the previous revoked Connectors/Tasks to the members
W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
{code}
Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.

  was:
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W4 added after 1st rebalance 
completed and before 2nd rebalance started? Let's see what will happened? Let's 
see this example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])

W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []

// one more member joined
W4 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
W2(delay: 0, assigned: [BT4, BT5], revoked: [])
{code}
We cannot just assign the revoked C/T to the new members in the 2nd round of 
rebalance. We should have another round of revocation to revoke the C/T to the 
other members.

expected:
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])

W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []

// one more member joined
W4 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
**and also revoke some C/T** 
W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
W4(delay: 0, assigned: [BT4, BT5], revoked: [])

// another round of rebalance to assign the new revoked C/T to the other members
W1 rejoins with assignment: [AC0, AT1, AT2] 
Rebalance is triggered 
W2 joins with assignment: [AT4, AT5, BC0] 
W3 joins with assignment: [BT1, BT2, BT4]
W4 joins with assignment: [BT4, BT5]

W1 becomes leader 
W1 computes and sends assignments:

// (final) We assigned all the previous revoked Connectors/Tasks to the members
W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
{code}
Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.


> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12495
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Major
>         Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> and didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances, we will have this uneven distribution under this situation. We 
> should allow consecutive rebalance to have another round of revocation to 
> revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader 
> W1 computes and sends assignments:
> // (final) We assigned all the previous revoked Connectors/Tasks to the 
> members
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
> W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
> {code}
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to