[
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801927#comment-17801927
]
A. Sophie Blee-Goldman commented on KAFKA-16025:
------------------------------------------------
Nice find and writeup of the race condition – I'll take a look at the fix
> Streams StateDirectory has orphaned locks after rebalancing, blocking future
> rebalancing
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.4.0
> Environment: Linux
> Reporter: Sabit
> Priority: Major
>
> Hello,
>
> We are encountering an issue where during rebalancing, we see streams threads
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that
> some tasks were having issues initializing due to failure to grab a lock in
> the StateDirectory:
>
> {{2023-12-14 22:51:57.352000Z stream-thread
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51]
> Failed to lock the state directory for task 0_51; will retry}}
>
> We were able to reproduce this behavior reliably on 3.4.0. This is the
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E),
> each with 5 threads (1-5), and the consumer is using stateful tasks which
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
> # Instance A is deactivated
> # As an example, lets say task 0_1, previously on instance B, moves to
> instance C
> # Task 0_1 leaves behind it's state directory on Instance B's disk,
> currently unused, and no lock for it exists in Instance B's StateDirectory
> in-memory lock tracker
> # Instance A is re-activated
> # Streams thread 1 on Instance B is asked to re-join the consumer group due
> to a new member being added
> # As part of re-joining, thread 1 lists non-empty state directories in order
> to report the offset's it has in it's state stores as part of it's metadata.
> Thread 1 sees that the directory for 0_1 is not empty.
> # The cleanup thread on instance B runs. The cleanup thread locks state
> store 0_1, sees the directory for 0_1 was last modified more than
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
> # Thread 1 takes a lock on directory 0_1 due to it being found not-empty
> before, unaware that the cleanup has run between the time of the check and
> the lock. It tracks this lock in it's own in-memory store, in addition to
> StateDirectory's in-memory lock store
> # Thread 1 successfully joins the consumer group
> # After every consumer in the group joins the group, assignments are
> calculated, and then every consumer calls sync group to receive the new
> assignments
> # Thread 1 on Instance B calls sync group but gets an error - the group
> coordinator has triggered a new rebalance and all members must rejoin the
> group
> # Thread 1 again lists non-empty state directories in order to report the
> offset's it has in it's state stores as part of it's metadata. Prior to doing
> so, it clears it's in-memory store tracking the locks it has taken for the
> purpose of gathering rebalance metadata
> # Thread 1 no longer takes a lock on 0_1 as it is empty
> # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
> # All consumers re-join and sync successfully, receiving their new
> assignments
> # Thread 2 on Instance B is assigned task 0_1
> # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is
> still being held by Thread 1
> # Thread 2 remains in rebalancing state, and cannot make progress on task
> 0_1, or any other tasks it has assigned.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)