[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414732#comment-17414732 ]
NEERAJ VAIDYA edited comment on KAFKA-13292 at 9/14/21, 5:49 AM: ----------------------------------------------------------------- Thanks [~mjsax] for the reference to the KIP. It might not be possible for my application to be upgraded soon. However, I still feel this is a defect because I believed that stream processing should continue even when transactional.id.expiration.ms has elapsed and should not bring down the entire application. I noticed from the logs that the application does try to get a new epoch and producer Id, but then still suspends and stops all threads. As you will observe below towards the end of the log messages (at timestamp : 2021-09-10T12:21:59.766) , for the task[ 0_2], the call to "*Invoking InitProducerId*" is made, but still the flow ends up in the UnCaughtExceptionHandler. I was of the understanding that once the producerId epoch is bumped, stream processing should resume. However, it starts shutting down the entire application by an uncaught exception. *FYI - I am using exactly_once processing and enable.idempotence=true* {code:java} 2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its tra nsactional id. 2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT hread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PE NDING_SHUTDOWN 2021-09-10T12:21:59.741 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down 2021-09-10T12:21:59.743 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down 2021-09-10T12:21:59.744 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamTask - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] Suspended running 2021-09-10T12:21:59.747 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer, groupId=nu ll] Subscribed to partition(s): mtx-caf-DuplicateCheckStore-changelog-10, mtx-caf-DuplicateCheckStore-changelog-6, mtx-caf-DuplicateCheckStore-changelog-4, mtx-caf-DuplicateCheckStore-changelog-2, mtx-caf-AggregatedRecordStore-changelog-10, m tx-caf-DuplicateCheckStore-changelog-0, mtx-caf-AggregatedRecordStore-changelog-4, mtx-caf-AggregatedRecordStore-changelog-6, mtx-caf-AggregatedRecordStore-changelog-0, mtx-caf-AggregatedRecordStore-changelog-2, mtx-caf-DuplicateCheckStore-ch angelog-11, mtx-caf-DuplicateCheckStore-changelog-9, mtx-caf-DuplicateCheckStore-changelog-7, mtx-caf-AggregatedRecordStore-changelog-11, mtx-caf-DuplicateCheckStore-changelog-5, mtx-caf-DuplicateCheckStore-changelog-3, mtx-caf-AggregatedReco rdStore-changelog-7, mtx-caf-DuplicateCheckStore-changelog-1, mtx-caf-AggregatedRecordStore-changelog-9, mtx-caf-AggregatedRecordStore-changelog-3, mtx-caf-AggregatedRecordStore-changelog-5, mtx-caf-AggregatedRecordStore-changelog-1 2021-09-10T12:21:59.753 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] Closing record collector dirty ... ... ... ... 2021-09-10T12:21:59.763 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamTask - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Suspended running 2021-09-10T12:21:59.764 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): mtx-caf-DuplicateCheckStore-changelog-11, mtx-caf-DuplicateCheckStore-changelog-10, mtx-caf-DuplicateCheckStore-changelog-9, mtx-caf-DuplicateCheckStore-changelog-7, mtx-caf-DuplicateCheckStore-changelog-6, mtx-caf-AggregatedRecordStore-changelog-11, mtx-caf-DuplicateCheckStore-changelog-4, mtx-caf-AggregatedRecordStore-changelog-7, mtx-caf-AggregatedRecordStore-changelog-10, mtx-caf-DuplicateCheckStore-changelog-0, mtx-caf-AggregatedRecordStore-changelog-9, mtx-caf-AggregatedRecordStore-changelog-4, mtx-caf-AggregatedRecordStore-changelog-6, mtx-caf-AggregatedRecordStore-changelog-0 2021-09-10T12:21:59.765 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Closing record collector dirty 2021-09-10T12:21:59.765 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.producer.KafkaProducer - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Aborting incomplete transaction 2021-09-10T12:21:59.766 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Invoking InitProducerId with current producer ID and epoch (producerId=1005, epoch=16) in order to bump the epoch 2021-09-10T12:21:59.769 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] ProducerId set to 1006 with epoch 0 ... ... ... 2021-09-10T12:21:59.798 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.producer.KafkaProducer - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. {code} was (Author: neeraj.vaidya): Thanks [~mjsax] for the reference to the KIP. It might not be possible for my application to be upgraded soon. However, I still feel this is a defect because I believed that stream processing should continue even when transactional.id.expiration.ms has elapsed and should not bring down the entire application. I noticed from the logs that the application does try to get a new epoch and producer Id, but then still suspends and stops all threads. As you will observe below towards the end of the log messages, for the task[ 0_2], the call to "*Invoking InitProducerId*" is made, but still the flow ends up in the UnCaughtExceptionHandler. I was of the understanding that once the producerId epoch is bumped, stream processing should resume. However, it starts shutting down the entire application by an uncaught exception. *FYI - I am using exactly_once processing and enable.idempotence=true* {code:java} 2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its tra nsactional id. 2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT hread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PE NDING_SHUTDOWN 2021-09-10T12:21:59.741 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down 2021-09-10T12:21:59.743 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down 2021-09-10T12:21:59.744 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamTask - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] Suspended running 2021-09-10T12:21:59.747 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer, groupId=nu ll] Subscribed to partition(s): mtx-caf-DuplicateCheckStore-changelog-10, mtx-caf-DuplicateCheckStore-changelog-6, mtx-caf-DuplicateCheckStore-changelog-4, mtx-caf-DuplicateCheckStore-changelog-2, mtx-caf-AggregatedRecordStore-changelog-10, m tx-caf-DuplicateCheckStore-changelog-0, mtx-caf-AggregatedRecordStore-changelog-4, mtx-caf-AggregatedRecordStore-changelog-6, mtx-caf-AggregatedRecordStore-changelog-0, mtx-caf-AggregatedRecordStore-changelog-2, mtx-caf-DuplicateCheckStore-ch angelog-11, mtx-caf-DuplicateCheckStore-changelog-9, mtx-caf-DuplicateCheckStore-changelog-7, mtx-caf-AggregatedRecordStore-changelog-11, mtx-caf-DuplicateCheckStore-changelog-5, mtx-caf-DuplicateCheckStore-changelog-3, mtx-caf-AggregatedReco rdStore-changelog-7, mtx-caf-DuplicateCheckStore-changelog-1, mtx-caf-AggregatedRecordStore-changelog-9, mtx-caf-AggregatedRecordStore-changelog-3, mtx-caf-AggregatedRecordStore-changelog-5, mtx-caf-AggregatedRecordStore-changelog-1 2021-09-10T12:21:59.753 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] Closing record collector dirty ... ... ... ... 2021-09-10T12:21:59.763 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamTask - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Suspended running 2021-09-10T12:21:59.764 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): mtx-caf-DuplicateCheckStore-changelog-11, mtx-caf-DuplicateCheckStore-changelog-10, mtx-caf-DuplicateCheckStore-changelog-9, mtx-caf-DuplicateCheckStore-changelog-7, mtx-caf-DuplicateCheckStore-changelog-6, mtx-caf-AggregatedRecordStore-changelog-11, mtx-caf-DuplicateCheckStore-changelog-4, mtx-caf-AggregatedRecordStore-changelog-7, mtx-caf-AggregatedRecordStore-changelog-10, mtx-caf-DuplicateCheckStore-changelog-0, mtx-caf-AggregatedRecordStore-changelog-9, mtx-caf-AggregatedRecordStore-changelog-4, mtx-caf-AggregatedRecordStore-changelog-6, mtx-caf-AggregatedRecordStore-changelog-0 2021-09-10T12:21:59.765 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Closing record collector dirty 2021-09-10T12:21:59.765 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.clients.producer.KafkaProducer - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Aborting incomplete transaction 2021-09-10T12:21:59.766 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Invoking InitProducerId with current producer ID and epoch (producerId=1005, epoch=16) in order to bump the epoch 2021-09-10T12:21:59.769 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] ProducerId set to 1006 with epoch 0 {code} > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > ------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0 > Reporter: NEERAJ VAIDYA > Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12 partitions for > all topics) get shutdown and this brings down the whole application. > > I understand that the transactional.id.expiration.ms = 7 days (default) will > likely cause the application thread from getting expired, but why does this > specific thread/task not get fenced or respawned. > Why shutdown the entire Streams processing application just because one task > has been idle ?? > > Is there a way to keep my application up and running without causing it to > shutdown ? -- This message was sent by Atlassian Jira (v8.3.4#803005)