[ https://issues.apache.org/jira/browse/KAFKA-16056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Rao updated KAFKA-16056: ------------------------------ Description: When a poll timeout expiry happens for a worker, it triggers a rebalance because it leaves the group pro-actively. Under normal scenarios, this leaving the group would trigger a scheduled rebalance delay. However, one thing to note is that, the worker which left the group temporarily, doesn't give up it's assignments and whatever tasks were running on it would remain as is. When the scheduled rebalance delay elapses, it would just get back it's assignments but given that there won't be any revocations, it should all work out fine. But there is an edge case here. Let's assume that a scheduled rebalance delay was already active on a group and just before a follow up rebalance due to scheduled rebalance elapsing, one of the worker's poll timeout expires. At this point, a rebalance is imminent and the leader would track the assignments of the transiently departed worker as lost [here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255] . When [handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441] gets triggered, because the scheduledRebalance delay isn't reset yet and if [this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473] condition passes, the leader would assume that it needs to reassign all the lost assignments which it will. Another case where this can happen is if the rebalance itself was triggered due to worker's poll timeout expiry and for some reason, the worker just couldn't join back before scheduled.rebalance.delay elapses. We would notice duplicate task assignments even then. But because, the worker for which the poll timeout expired, doesn't rescind it's assignments we would end up noticing duplicate assignments- one set on the original worker which was already running the tasks and connectors and another set on the remaining group of workers which got the redistributed work. This could lead to task failures if connector has been written in a way which expects no duplicate tasks running across a cluster. Also, this edge case can be encountered more frequently if the `rebalance.timeout.ms` config is set to a lower value. One of the approaches could be to do something similar to https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator discovery failure, the worker gives up all it's assignments and joins with an empty assignment. We could do something similar in this case as well. was: When a poll timeout expiry happens for a worker, it triggers a rebalance because it leaves the group pro-actively. Under normal scenarios, this leaving the group would trigger a scheduled rebalance delay. However, one thing to note is that, the worker which left the group temporarily, doesn't give up it's assignments and whatever tasks were running on it would remain as is. When the scheduled rebalance delay elapses, it would just get back it's assignments but given that there won't be any revocations, it should all work out fine. But there is an edge case here. Let's assume that a scheduled rebalance delay was already active on a group and just before a follow up rebalance due to scheduled rebalance elapsing, one of the worker's poll timeout expires. At this point, a rebalance is imminent and the leader would track the assignments of the transiently departed worker as lost [here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255] . When [handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441] gets triggered, because the scheduledRebalance delay isn't reset yet and if [this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473] condition passes, the leader would assume that it needs to reassign all the lost assignments which it will. But because, the worker for which the poll timeout expired, doesn't rescind it's assignments we would end up noticing duplicate assignments- one set on the original worker which was already running the tasks and connectors and another set on the remaining group of workers which got the redistributed work. This could lead to task failures if connector has been written in a way which expects no duplicate tasks running across a cluster. Also, this edge case can be encountered more frequently if the `rebalance.timeout.ms` config is set to a lower value. One of the approaches could be to do something similar to https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator discovery failure, the worker gives up all it's assignments and joins with an empty assignment. We could do something similar in this case as well. > Worker poll timeout expiry can lead to Duplicate task assignments. > ------------------------------------------------------------------ > > Key: KAFKA-16056 > URL: https://issues.apache.org/jira/browse/KAFKA-16056 > Project: Kafka > Issue Type: Bug > Components: connect > Reporter: Sagar Rao > Assignee: Sagar Rao > Priority: Major > Labels: connect > > When a poll timeout expiry happens for a worker, it triggers a rebalance > because it leaves the group pro-actively. Under normal scenarios, this > leaving the group would trigger a scheduled rebalance delay. However, one > thing to note is that, the worker which left the group temporarily, doesn't > give up it's assignments and whatever tasks were running on it would remain > as is. When the scheduled rebalance delay elapses, it would just get back > it's assignments but given that there won't be any revocations, it should all > work out fine. > But there is an edge case here. Let's assume that a scheduled rebalance delay > was already active on a group and just before a follow up rebalance due to > scheduled rebalance elapsing, one of the worker's poll timeout expires. At > this point, a rebalance is imminent and the leader would track the > assignments of the transiently departed worker as lost > [here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255] > . When > [handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441] > gets triggered, because the scheduledRebalance delay isn't reset yet and if > [this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473] > condition passes, the leader would assume that it needs to reassign all the > lost assignments which it will. Another case where this can happen is if the > rebalance itself was triggered due to worker's poll timeout expiry and for > some reason, the worker just couldn't join back before > scheduled.rebalance.delay elapses. We would notice duplicate task assignments > even then. > But because, the worker for which the poll timeout expired, doesn't rescind > it's assignments we would end up noticing duplicate assignments- one set on > the original worker which was already running the tasks and connectors and > another set on the remaining group of workers which got the redistributed > work. This could lead to task failures if connector has been written in a way > which expects no duplicate tasks running across a cluster. > Also, this edge case can be encountered more frequently if the > `rebalance.timeout.ms` config is set to a lower value. > One of the approaches could be to do something similar to > https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator > discovery failure, the worker gives up all it's assignments and joins with an > empty assignment. We could do something similar in this case as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)