[ https://issues.apache.org/jira/browse/KAFKA-9972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104766#comment-17104766 ]
Guozhang Wang commented on KAFKA-9972: -------------------------------------- Since at least for now the task-manager (and then its included tasks) are processed single-threaded, the task state should not be changed in 1/2/3 steps above, so I think just checking the state of the task is fine. This is also because today we have three steps as John listed above. If we can actually merge them in one call as part of the cleanup (which I think may be possible as we discussed in the other PR), then I'm more inclined to Sophie's proposal as to let the task itself decide whether it should commit or not and not exposing the state out of it to the task-manager (I know today even without this we already have other places where task-manager checks the state of individual tasks, but personally I think it's better if we can make them all captured inside task itself... just my preference). > Corrupted standby task could be committed > ----------------------------------------- > > Key: KAFKA-9972 > URL: https://issues.apache.org/jira/browse/KAFKA-9972 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > > A corrupted standby task could revive and transit to the CREATED state, which > will then trigger by `taskManager.commitAll` in next runOnce, causing an > illegal state: > ``` > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,646] WARN > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException > fetching records from restore consumer for partitions > [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1], it > is likely that the consumer's position has fallen out of the topic partition > offset range because the topic was truncated or compacted on the broker, > marking the corresponding tasks as corrupted and re-initializing it later. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,646] WARN > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > Detected the states of tasks > \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]} > are corrupted. Will close the task as dirty and re-create and bootstrap from > scratch. (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs > \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]} > are corrupted and hence needs to be re-initialized > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,652] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > [Consumer > clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > (org.apache.kafka.clients.consumer.KafkaConsumer) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,652] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > standby-task [1_1] Prepared dirty close > (org.apache.kafka.streams.processor.internals.StandbyTask) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,679] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > standby-task [1_1] Closed dirty > (org.apache.kafka.streams.processor.internals.StandbyTask) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,751] ERROR > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) > java.lang.IllegalStateException: Illegal state CREATED while preparing > standby task 1_1 for committing > at > org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,751] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > ``` > Two solutions here: either we deprecate `commitAll` and always enforce state > check to selectively commit tasks, or we enforce a state check inside standby > task commitNeeded call to reference its state. Added a fix for option one > here. -- This message was sent by Atlassian Jira (v8.3.4#803005)