Rohan Desai created KAFKA-16876:
-----------------------------------
Summary: TaskManager.handleRevocation doesn't handle errors thrown
from task.prepareCommit
Key: KAFKA-16876
URL: https://issues.apache.org/jira/browse/KAFKA-16876
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.6.0
Reporter: Rohan Desai
`TaskManager.handleRevocation` does not handle exceptions thrown by
`task.prepareCommit`. In the particular instance I observed, `pepareCommit`
flushed caches which led to downstream `producer.send` calls that threw a
`TaskMigratedException`. This means that the tasks that need to be revoked are
not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown
exception and then moves on to the other task assignment callbacks. One of
these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises
an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close
fails so we don't leak any tasks. I think there's maybe two bugs here:
# `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It
should try not to leave any revoked tasks in an unsuspended state.
# The `ConsumerCoordinator` just throws the first exception that it sees. But
it seems bad to throw the `TaskMigratedException` and drop the
`IllegalStateException` (though in this case I think its relatively benign). I
think on `IllegalStateException` we really want the streams thread to exit. One
idea here is to have `ConsumerCoordinator` throw an exception type that
includes the other exceptions that it has seen in another field. But this
breaks the contract for clients that catch specific exceptions. I'm not sure of
a clean solution, but I think its at least worth recording that it would be
preferable to have the caller of `poll` handle all the thrown exceptions rather
than just the first one.
Here is the IllegalStateException stack trace I observed:
{code:java}
[ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556
[e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager -
stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining
tasks before re-throwing:
[ 508.535] [service_application2] [inf] java.lang.IllegalStateException:
Illegal state RUNNING while closing active task 0_3
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
~[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
~[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
~[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
[kafka-clients-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94)
[kafka-client-0.24.0-dc9acd1.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
[kafka-streams-3.6.0.jar:?]
[ 508.535] [service_application2] [inf] at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
[kafka-streams-3.6.0.jar:?] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)