[ 
https://issues.apache.org/jira/browse/KAFKA-10651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-10651:
----------------------------------------------

    Assignee: A. Sophie Blee-Goldman

> Assignor reports offsets from uninitialized task
> ------------------------------------------------
>
>                 Key: KAFKA-10651
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10651
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Blocker
>             Fix For: 2.7.0, 2.6.1
>
>
> In KIP-441, the new HA assignor makes an informed decision about stateful 
> task placement based on the offset sums reported by each instance. Offset 
> sums are computed one of two ways: for assigned tasks (ie those in the 
> TaskManager's "tasks" map), it will just sum up the tasks' changelog offsets 
> directly. For tasks that are not assigned but whose directory remains on 
> disk, it reads the changelog offsets from the checkpoint file. This is 
> encoded with the subscription userdata sent during the JoinGroup phase of a 
> rebalance.
> The problem here is that it's possible for the instance to rejoin the group 
> after having been assigned a new task, but before that task is initialized. 
> In this case it would not compute the offset sum from the checkpoint file but 
> instead from the uninitialized task, causing it to skip reporting any offsets 
> for that task whatsoever.
> This results in a particularly nefarious interaction between HA and 
> cooperative rebalancing. An instance may read from the checkpoint file of a 
> caught-up (but unassigned) task and report this in its subscription, leading 
> the assignor to compute a small lag and place this task on the instance. 
> After placing all stateful tasks in this way, it will distribute the 
> stateless tasks across the group to balance the overall workload. It does 
> this without considering the previous owner of the stateless tasks, so odds 
> are good that moving the stateful task to this instance will result in a 
> different assortment of stateless tasks in this rebalance.
> Any time owned tasks are moved around, the current owner will have to revoke 
> them and trigger a followup cooperative rebalance. Within the Consumer 
> client, this actually happens immediately: that is, within an invocation of 
> poll() it will loop inside joinGroupIfNeeded() as long as a rejoin is needed. 
> And at the end of the last rebalance, if any partitions are revoked then a 
> rejoin will indeed be needed. So the Consumer will send out it's next 
> JoinGroup – including the userdata with computed task offset sums – without 
> first exiting from the current poll(). Streams never gets the chance to 
> initialize its new tasks, and ends up excluding them from the offset sums it 
> reports in the following rebalance.
> And since it doesn't report any offsets for this task, the assignor now 
> believes the instance does _not_ have any caught up state for this task, and 
> assigns the task elsewhere. This causes a shuffling of stateless tasks once 
> more, which in turn results in another cooperative rebalance. This time the 
> task is no longer assigned so the instance reports offsets based on the 
> checkpoint file again, and we're back at the beginning.
> Given the deterministic assignment, once a group is caught up in this cycle 
> it will be impossible to escape it without manual intervention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to