[ https://issues.apache.org/jira/browse/KAFKA-10651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman reassigned KAFKA-10651: ---------------------------------------------- Assignee: A. Sophie Blee-Goldman > Assignor reports offsets from uninitialized task > ------------------------------------------------ > > Key: KAFKA-10651 > URL: https://issues.apache.org/jira/browse/KAFKA-10651 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0 > Reporter: A. Sophie Blee-Goldman > Assignee: A. Sophie Blee-Goldman > Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > In KIP-441, the new HA assignor makes an informed decision about stateful > task placement based on the offset sums reported by each instance. Offset > sums are computed one of two ways: for assigned tasks (ie those in the > TaskManager's "tasks" map), it will just sum up the tasks' changelog offsets > directly. For tasks that are not assigned but whose directory remains on > disk, it reads the changelog offsets from the checkpoint file. This is > encoded with the subscription userdata sent during the JoinGroup phase of a > rebalance. > The problem here is that it's possible for the instance to rejoin the group > after having been assigned a new task, but before that task is initialized. > In this case it would not compute the offset sum from the checkpoint file but > instead from the uninitialized task, causing it to skip reporting any offsets > for that task whatsoever. > This results in a particularly nefarious interaction between HA and > cooperative rebalancing. An instance may read from the checkpoint file of a > caught-up (but unassigned) task and report this in its subscription, leading > the assignor to compute a small lag and place this task on the instance. > After placing all stateful tasks in this way, it will distribute the > stateless tasks across the group to balance the overall workload. It does > this without considering the previous owner of the stateless tasks, so odds > are good that moving the stateful task to this instance will result in a > different assortment of stateless tasks in this rebalance. > Any time owned tasks are moved around, the current owner will have to revoke > them and trigger a followup cooperative rebalance. Within the Consumer > client, this actually happens immediately: that is, within an invocation of > poll() it will loop inside joinGroupIfNeeded() as long as a rejoin is needed. > And at the end of the last rebalance, if any partitions are revoked then a > rejoin will indeed be needed. So the Consumer will send out it's next > JoinGroup – including the userdata with computed task offset sums – without > first exiting from the current poll(). Streams never gets the chance to > initialize its new tasks, and ends up excluding them from the offset sums it > reports in the following rebalance. > And since it doesn't report any offsets for this task, the assignor now > believes the instance does _not_ have any caught up state for this task, and > assigns the task elsewhere. This causes a shuffling of stateless tasks once > more, which in turn results in another cooperative rebalance. This time the > task is no longer assigned so the instance reports offsets based on the > checkpoint file again, and we're back at the beginning. > Given the deterministic assignment, once a group is caught up in this cycle > it will be impossible to escape it without manual intervention. -- This message was sent by Atlassian Jira (v8.3.4#803005)