[ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14567.
-------------------------------
    Resolution: Cannot Reproduce

We have not been able to reproduce this in our internal test environment 
despite monitoring it for the last several months.

We can reopen when we have another report, ideally with a reproducible test 
case and/or logs.

> Kafka Streams crashes after ProducerFencedException
> ---------------------------------------------------
>
>                 Key: KAFKA-14567
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14567
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer , streams
>    Affects Versions: 3.7.0
>            Reporter: Matthias J. Sax
>            Assignee: Kirk True
>            Priority: Blocker
>              Labels: eos, transactions
>             Fix For: 4.2.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-0000000005, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> Caused by: java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
>         ... 6 more
> {quote}
> It seems we try to call `send()` after the producer was fenced. However, 
> after a producer was fenced, we should close all tasks dirty, and try to 
> rejoin the group, and should not call `send()` on the already fenced producer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to