[
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Christo Lolov updated KAFKA-14567:
----------------------------------
Fix Version/s: (was: 4.2.0)
> Kafka Streams crashes after ProducerFencedException
> ---------------------------------------------------
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
> Issue Type: Bug
> Components: clients, producer , streams
> Affects Versions: 3.7.0
> Reporter: Matthias J. Sax
> Assignee: Kirk True
> Priority: Blocker
> Labels: eos, transactions
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3]
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream
> task 1_2 due to the following error:
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=1_2, processor=KSTREAM-SOURCE-0000000005,
> topic=node-name-repartition, partition=2, offset=539776276,
> stacktrace=java.lang.IllegalStateException: TransactionalId
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> Caused by: java.lang.IllegalStateException: TransactionalId
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> ... 6 more
> {quote}
> It seems we try to call `send()` after the producer was fenced. However,
> after a producer was fenced, we should close all tasks dirty, and try to
> rejoin the group, and should not call `send()` on the already fenced producer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)