[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-14567: ------------------------------------ Description: Running a Kafka Streams application with EOS-v2. We first see a `ProducerFencedException`. After the fencing, the fenced thread crashed resulting in a non-recoverable error: {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream task 1_2 due to the following error: (org.apache.kafka.streams.processor.internals.TaskExecutor) org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_2, processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition, partition=2, offset=539776276, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) Caused by: java.lang.IllegalStateException: TransactionalId stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) ... 6 more {quote} It seems we try to call `send()` after the producer was fenced. However, after a producer was fenced, we should close all tasks dirty, and try to rejoin the group, and should not call `send()` on the already fenced producer. was: Running a Kafka Streams application with EOS-v2. After a thread crashed, we re-spanned a new thread what implies that the thread-index number was re-used, resulting in an `transactional.id` reuse, that lead to a `ProducerFencedException`. After the fencing, the fenced thread crashed resulting in a non-recoverable error: {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream task 1_2 due to the following error: (org.apache.kafka.streams.processor.internals.TaskExecutor) org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_2, processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition, partition=2, offset=539776276, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) Caused by: java.lang.IllegalStateException: TransactionalId stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) ... 6 more{quote} It seems we try to call `send()` after the producer was fenced. However, after a producer was fenced, we should close all tasks dirty, and try to rejoin the group, and should not call `send()` on the already fenced producer. > Kafka Streams crashes after ProducerFencedException > --------------------------------------------------- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Matthias J. Sax > Priority: Major > Labels: eos > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-0000000005, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569) > Caused by: java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > ... 6 more > {quote} > It seems we try to call `send()` after the producer was fenced. However, > after a producer was fenced, we should close all tasks dirty, and try to > rejoin the group, and should not call `send()` on the already fenced producer. -- This message was sent by Atlassian Jira (v8.20.10#820010)