[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen updated KAFKA-12495: ------------------------------ Description: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, and didn't revoke any more C/T in this round, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5], revoked: []) {code} Because we didn't allow to do consecutive revoke in two consecutive rebalances, we will have this uneven distribution under this situation. We should allow consecutive rebalance to have another round of revocation to revoke the C/T to the other members in this case. expected: {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, **and also revoke some C/T** W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W4(delay: 0, assigned: [BT4, BT5], revoked: []) // another round of rebalance to assign the new revoked C/T to the other members W1 rejoins with assignment: [AC0, AT1, AT2] Rebalance is triggered W2 joins with assignment: [AT4, AT5, BC0] W3 joins with assignment: [BT1, BT2, BT4] W4 joins with assignment: [BT4, BT5] W1 becomes leader W1 computes and sends assignments: // (final) We assigned all the previous revoked Connectors/Tasks to the members W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: []) {code} Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. was: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5], revoked: []) {code} We cannot just assign the revoked C/T to the new members in the 2nd round of rebalance. We should have another round of revocation to revoke the C/T to the other members. expected: {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, **and also revoke some C/T** W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W4(delay: 0, assigned: [BT4, BT5], revoked: []) // another round of rebalance to assign the new revoked C/T to the other members W1 rejoins with assignment: [AC0, AT1, AT2] Rebalance is triggered W2 joins with assignment: [AT4, AT5, BC0] W3 joins with assignment: [BT1, BT2, BT4] W4 joins with assignment: [BT4, BT5] W1 becomes leader W1 computes and sends assignments: // (final) We assigned all the previous revoked Connectors/Tasks to the members W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: []) {code} Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > and didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances, we will have this uneven distribution under this situation. We > should allow consecutive rebalance to have another round of revocation to > revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4] > W4 joins with assignment: [BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > // (final) We assigned all the previous revoked Connectors/Tasks to the > members > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: []) > {code} > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. -- This message was sent by Atlassian Jira (v8.3.4#803005)