Sagar Rao created KAFKA-16056:
---------------------------------

             Summary: Worker poll timeout expiry can lead to Duplicate task 
assignments.
                 Key: KAFKA-16056
                 URL: https://issues.apache.org/jira/browse/KAFKA-16056
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
            Reporter: Sagar Rao
            Assignee: Sagar Rao


When a poll timeout expiry happens for a worker, it triggers a rebalance 
because it leaves the group pro-actively. Under normal scenarios, this leaving 
the group would trigger a scheduled rebalance delay. However, one thing to note 
is that, the worker which left the group temporarily, doesn't give up it's 
assignments and whatever tasks were running on it would remain as is. When the 
scheduled rebalance delay elapses, it would just get back it's assignments but 
given that there won't be any revocations, it should all work out fine.

But there is an edge case here. Let's assume that a scheduled rebalance delay 
was already active on a group and just before a follow up rebalance due to 
scheduled rebalance elapsing, one of the worker's poll timeout expires. At this 
point, a rebalance is imminent and the leader would track the assignments of 
the transiently departed worker as lost 
[here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255]
 . When 
[handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441]
 gets triggered, because the scheduledRebalance delay isn't reset yet and if 
[this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473]
 condition passes, the leader would assume that it needs to reassign all the 
lost assignments which it will.

But because, the worker for which the poll timeout expired, doesn't rescind 
it's assignments we would end up noticing duplicate assignments- one set on the 
original worker which was already running the tasks and connectors and another 
set on the remaining group of workers which got the redistributed work. This 
could lead to task failures if connector has been written in a way which 
expects no duplicate tasks running across a cluster.

Also, this edge case can be encountered more frequently if the 
`rebalance.timeout.ms` config is set to a lower value. 

One of the approaches could be to do something similar to 
https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator 
discovery failure, the worker gives up all it's assignments and joins with an 
empty assignment. We could do something similar in this case as well.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to