[
https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Roesler updated KAFKA-10247:
---------------------------------
Priority: Blocker (was: Major)
> Streams may attempt to process after closing a task
> ---------------------------------------------------
>
> Key: KAFKA-10247
> URL: https://issues.apache.org/jira/browse/KAFKA-10247
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.0
> Reporter: John Roesler
> Assignee: John Roesler
> Priority: Blocker
>
> Observed in a system test. A corrupted task was detected, and Stream properly
> closed it as dirty:
> {code:java}
> [2020-07-08 17:08:09,345] WARN stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records
> from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it
> is likely that the consumer's position has fallen out of the topic partition
> offset range because the topic was truncated or compacted on the broker,
> marking the corresponding tasks as corrupted and re-initializing it later.
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position
> FetchPosition{offset=1, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack:
> null)], epoch=0}} is out of range for partition
> SmokeTest-cntStoreName-changelog-1
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,345] WARN stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the
> states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted.
> Will close the task as dirty and re-create and bootstrap from scratch.
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs
> {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to
> be re-initialized
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch
> position FetchPosition{offset=1, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack:
> null)], epoch=0}} is out of range for partition
> SmokeTest-cntStoreName-changelog-1
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
> ... 3 more
> [2020-07-08 17:08:09,346] INFO stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
> Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-07-08 17:08:09,346] DEBUG stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
> Closing its state manager and all the registered state stores:
> {sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 :
> SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null,
> cntStoreName=StateStoreMetadata (cntStoreName :
> SmokeTest-cntStoreName-changelog-1 @ 0}
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> [2020-07-08 17:08:09,346] INFO [Consumer
> clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer,
> groupId=null] Subscribed to partition(s):
> SmokeTest-minStoreName-changelog-1, SmokeTest-minStoreName-changelog-2,
> SmokeTest-sum-STATE-STORE-0000000050-changelog-0,
> SmokeTest-minStoreName-changelog-3,
> SmokeTest-sum-STATE-STORE-0000000050-changelog-2,
> SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0,
> SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2,
> SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2020-07-08 17:08:09,348] DEBUG stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released
> state dir lock for task 2_1
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2020-07-08 17:08:09,348] INFO stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
> Closing record collector dirty
> (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> [2020-07-08 17:08:09,348] INFO stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
> Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask){code}
> However, there were already records buffered for it, so later on in the same
> processing loop, Streams tried to process that task, resulting in an
> IllegalStateException:
> {code:java}
> [2020-07-08 17:08:09,352] ERROR stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to
> process stream task 2_1 due to the following error:
> (org.apache.kafka.streams.processor.internals.TaskManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store
> cntStoreName is currently closed.
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
> at
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
> at
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,352] ERROR stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered
> the following exception during processing and the thread is going to shut
> down: (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store
> cntStoreName is currently closed.
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
> at
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
> at
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,352] INFO stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State
> transition from RUNNING to PENDING_SHUTDOWN
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-07-08 17:08:09,352] INFO stream-thread
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down
> (org.apache.kafka.streams.processor.internals.StreamThread){code}
> Which caused the entire thread to shut down.
>
> Instead, we should not attempt to process tasks that are not running.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)