[
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761
]
Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM:
----------------------------------------------------------
I think this bug can occur when rebalancing is triggered in the middle of the
cleanup thread running due to multiple calls to
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed
to be called before the next invocation of
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so
any tasks that were in `lockedTaskDirectories` that now have empty directories
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks
to this thread in the `StateDirectory`.
Logs that correspond to this:
- CleanupThread starts on `i-0f1a5e7a42158e04b`
{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed
(cleanup delay is 600000ms).}}
- New member joins the consumer group
{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown
member id joins group 0.agg in Stable state. Created a new member id
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and
add to the group.}}
- Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread
is deleting directories
- This is where Thread-21 and the CleanUp thread race to lock the same
directories and reach a state where thread 21 thought it locked non-empty
directories, but they actually got emptied by the cleaner thread. The only
thing that doesn't lineup here is that the delete time of 0_37 is before the
stream thread began re-joining, but the timeline of other deletions aligns well
{{2023-12-13 22:57:17.980000Z stream-thread
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for
task 0_37 as 653970ms has elapsed (cleanup delay is 600000ms).}}
{{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg]
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg
generation 122096 (__consumer_offsets-7)}}
- Now, if when this state is reached and
[TaskManager#handleRebalanceComplete#handleRebalanceComplete|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]]
is called, the tasks with now empty directories will get unlocked as expected.
We would see this with the log ["Adding newly assigned partitions"|#L316-L333]]
as a result of the sync group request succeeding following a successful join
request. However, the sync group request fails for this thread because another
rebalance is triggered:
{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance
group 0.agg in state PreparingRebalance with old generation 122096
(__consumer_offsets-7) (reason: Updating metadata for member
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg]
Successfully joined group with generation Generation{generationId=122096,
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f',
protocol='stream'{}}}}
{{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup
failed: The group began another rebalance. Need to re-join the group. Sent
generation was Generation{generationId=122096,
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f',
protocol='stream'{}}}}
{{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request
joining group due to: rebalance failed due to 'The group is rebalancing, so a
rejoin is needed.' (RebalanceInProgressException)}}
{{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg]
(Re-)joining group}}
- Now the next join request executes `tryToLockAllNonEmptyTaskDirectories`
again, clearing the prior state of `lockedTaskDirectories`, not realizing there
were some locks in there that it could not grab again as the directories had
become empty.
I need to reproduce this issue to confirm it, but the fix is rather simple:
Just check if the directory is still non-empty in
`tryToLockAllNonEmptyTaskDirectories` after taking the lock. If it has become
empty, unlock it.
was (Author: JIRAUSER303392):
I think this bug can occur when rebalancing is triggered in the middle of the
cleanup thread running due to multiple calls to
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed
to be called before the next invocation of
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so
any tasks that were in `lockedTaskDirectories` that now have empty directories
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks
to this thread in the `StateDirectory`.
Logs that correspond to this:
- CleanupThread starts on `i-0f1a5e7a42158e04b`
{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed
(cleanup delay is 600000ms).}}
- New member joins the consumer group
{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown
member id joins group 0.agg in Stable state. Created a new member id
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and
add to the group.}}
- Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread
is deleting directories
- This is where Thread-21 and the CleanUp thread race to lock the same
directories and reach a state where thread 21 thought it locked non-empty
directories, but they actually got emptied by the cleaner thread. The only
thing that doesn't lineup here is that the delete time of 0_37 is before the
stream thread began re-joining, but the timeline of other deletions aligns well
{{2023-12-13 22:57:17.980000Z stream-thread
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for
task 0_37 as 653970ms has elapsed (cleanup delay is 600000ms).}}
{{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg]
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg
generation 122096 (__consumer_offsets-7)}}
- Now, if when this state is reached and
[[TaskManager#handleRebalanceComplete||#handleRebalanceComplete]
[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]
[]|#handleRebalanceComplete] is called, the tasks with now empty directories
will get unlocked as expected. We would see this with the log ["Adding newly
assigned
partitions"|[https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]]
as a result of the sync group request succeeding following a successful join
request. However, the sync group request fails for this thread because another
rebalance is triggered:
{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance
group 0.agg in state PreparingRebalance with old generation 122096
(__consumer_offsets-7) (reason: Updating metadata for member
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg]
Successfully joined group with generation Generation{generationId=122096,
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f',
protocol='stream'{}}}}
{{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup
failed: The group began another rebalance. Need to re-join the group. Sent
generation was Generation{generationId=122096,
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f',
protocol='stream'{}}}}
{{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request
joining group due to: rebalance failed due to 'The group is rebalancing, so a
rejoin is needed.' (RebalanceInProgressException)}}
{{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21,
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg]
(Re-)joining group}}
- Now the next join request executes `tryToLockAllNonEmptyTaskDirectories`
again, clearing the prior state of `lockedTaskDirectories`, not realizing there
were some locks in there that it could not grab again as the directories had
become empty.
I need to reproduce this issue to confirm it, but the fix is rather simple:
Just check if the directory is still non-empty in
`tryToLockAllNonEmptyTaskDirectories` after taking the lock. If it has become
empty, unlock it.
> Streams StateDirectory has orphaned locks after rebalancing, blocking future
> rebalancing
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.4.0
> Environment: Linux
> Reporter: Sabit
> Priority: Major
> Attachments: Screenshot 1702750558363.png
>
>
> Hello,
>
> We are encountering an issue where during rebalancing, we see streams threads
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that
> some tasks were having issues initializing due to failure to grab a lock in
> the StateDirectory:
>
> {{2023-12-14 22:51:57.352000Z stream-thread
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51]
> Failed to lock the state directory for task 0_51; will retry}}
>
> Reading the comments for TaskManager, this seems like an expected case,
> however in our situation, the thread would get stuck on this permanently.
> After looking at the logs, we came to understand that whenever tasks 0_51,
> 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, the assigned
> threads would get stuck due to being unable to grab the lock. We took a heap
> dump of this JVM and found that all of these tasks were being locks by
> StreamThread-21 (see attachment). Additionally, each of these task
> directories exist on the client but are empty directories.
>
> The sequence of events that occurred for us to arrive at this state is that
> initially, all of these tasks were being processed on the impacted client,
> either as active or standby tasks. We had one client drop out of the consumer
> group, so these tasks were rebalanced away from the client. When we try to
> bring up a new client to replace the one that dropped out, the impacted
> client cannot initialize these 5 tasks it was initially processing. Sample of
> one timeline:
>
> {{# Task moved away from the original consumer thread}}
> {{2023-12-13 22:45:58.240000Z stream-thread
> [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}}
> {{2023-12-13 22:45:58.263000Z stream-thread
> [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}}
> {{# Directory cleaned up}}
> {{2023-12-13 22:57:18.696000Z stream-thread
> [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51
> for task 0_51 as 680455ms has elapsed (cleanup delay is 600000ms).}}
> {{# Cannot initialize task when it is re-assigned to this client}}
> {{2023-12-14 22:51:57.352000Z stream-thread
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51]
> Failed to lock the state directory for task 0_51; will retry}}
>
> {{Reading through the StateDirectory, it wasn't immediately obvious how we
> could arrive in a situation where the task is locked by a thread it hadn't
> been attempted to be assigned to yet, while the directory was cleaned up, but
> is now empty instead of being deleted. We didn't observe any filesystem
> issues on this client around this time either.}}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)