[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521996#comment-17521996 ]
Chris Egerton edited comment on KAFKA-12495 at 4/14/22 2:14 AM: ---------------------------------------------------------------- {quote}Allowing for consecutive revocations that happen immediately when an imbalance is detected might mean that the workers overreact to external circumstances that have caused an imbalanced between the initial calculation of task assignments of the revocation rebalance and the subsequent rebalance for the assignment of revoked tasks. Such circumstances might have to do with rolling upgrades, scaling a cluster up or down or simply might be caused by temporary instability. {quote} Have you identified a plausible case where this may be an issue? Load-balancing revocations are only necessary when the number of workers has increased (or when the number of connectors/tasks has decreased, although this is not addressed in the current rebalancing algorithm). At least with the case of new workers, is it really an overreaction to find connectors/tasks to allocate to them as soon as possible? Going over the example cases provided: * With a rolling upgrade, the existing delayed rebalancing logic should already apply, preventing excessive revocations from taking place * With a cluster scale-down, no load-balancing revocations should be necessary, since the number of connectors/tasks per worker will increase, not decrease * With a cluster scale-up, immediate revocation will not only not be harmful, it will actually be advantageous as it will allow the new workers to begin work immediately instead of waiting for the scheduled rebalance delay to elapse. This could be crucial if there's a load burst across the cluster and an external auto-scaling process spins up new workers to try to respond as quickly as possible * With temporary instability, if workers fall out of the cluster, the existing delayed rebalancing logic should already apply, preventing excessive revocations from taking place. There may be another interpretation of what this scenario would look like in terms of workers leaving/joining the cluster; let me know if you had something else in mind The only case I can think of where unconditionally delaying between revocations may be beneficial is if there's a rapid scale-up and then immediate scale-down of a cluster. If we hold off on revoking too many connectors/tasks from the pre-scale-up workers in the cluster, then we'll have to reassign fewer of them once the scale-down takes place. But unless I'm missing something, this is an unlikely edge case and should not be prioritized. {quote}To shield ourselves from infinite such rebalances the leader should also keep track of how many such attempts have been made and stop attempting to balance out tasks after a certain number of tries. Of course every other normal rebalance should reset both this counter and possibly the delay. {quote} This is a great suggestion, especially since it can (and should) be implemented regardless of whether a delay is added between consecutive load-balancing revocations. was (Author: chrisegerton): {quote}Allowing for consecutive revocations that happen immediately when an imbalance is detected might mean that the workers overreact to external circumstances that have caused an imbalanced between the initial calculation of task assignments of the revocation rebalance and the subsequent rebalance for the assignment of revoked tasks. Such circumstances might have to do with rolling upgrades, scaling a cluster up or down or simply might be caused by temporary instability. {quote} Have you identified a plausible case where this may be an issue? Load-balancing revocations are only necessary when the number of workers has increased (or when the number of connectors/tasks has decreased, although this is not addressed in the current rebalancing algorithm). At least with the case of new workers, is it really an overreaction to find connectors/tasks to allocate to them as soon as possible? Going over the example cases provided: * With a rolling upgrade, the existing delayed rebalancing logic should already apply, preventing excessive revocations from taking place * With a cluster scale-down, no load-balancing revocations should be necessary, since the number of connectors/tasks per worker will increase, not decrease * With a cluster scale-up, immediate revocation will not only not be harmful, it will actually be advantageous as it will allow the new workers to begin work immediately instead of waiting for the scheduled rebalance delay to elapse. This could be crucial if there's a load burst across the cluster and an external auto-scaling process spins up new workers to try to respond as quickly as possible * With temporary instability, if workers fall out of the cluster, the existing delayed rebalancing logic should already apply, preventing excessive revocations from taking place. There may be another interpretation of this scenario; let me know if you had something else in mind The only case I can think of where unconditionally delaying between revocations may be beneficial is if there's a rapid scale-up and then immediate scale-down of a cluster. If we hold off on revoking too many connectors/tasks from the pre-scale-up workers in the cluster, then we'll have to reassign fewer of them once the scale-down takes place. But unless I'm missing something, this is an unlikely edge case and should not be prioritized. {quote}To shield ourselves from infinite such rebalances the leader should also keep track of how many such attempts have been made and stop attempting to balance out tasks after a certain number of tries. Of course every other normal rebalance should reset both this counter and possibly the delay. {quote} This is a great suggestion, especially since it can (and should) be implemented regardless of whether a delay is added between consecutive load-balancing revocations. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Blocker > Fix For: 3.2.0 > > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4] > W4 joins with assignment: [BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > // (final) We assigned all the previous revoked Connectors/Tasks to the > members > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: []) > {code} > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. > > Note2: this issue makes KAFKA-12283 test flaky. -- This message was sent by Atlassian Jira (v8.20.1#820001)