[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414732#comment-17414732
 ] 

NEERAJ VAIDYA edited comment on KAFKA-13292 at 9/14/21, 5:49 AM:
-----------------------------------------------------------------

Thanks [~mjsax] for the reference to the KIP. It might not be possible for my 
application to be upgraded soon.

However, I still feel this is a defect because I believed that stream 
processing should continue even when transactional.id.expiration.ms has elapsed 
and should not bring down the entire application.

I noticed from the logs that the application does try to get a new epoch and 
producer Id, but then still suspends and stops all threads. As you will observe 
below towards the end of the log messages (at timestamp : 
2021-09-10T12:21:59.766) , for the task[ 0_2], the call to "*Invoking 
InitProducerId*" is made, but still the flow ends up in the 
UnCaughtExceptionHandler.

I was of the understanding that once the producerId epoch is bumped, stream 
processing should resume.

However, it starts shutting down the entire application by an uncaught 
exception.

*FYI - I am using exactly_once processing and enable.idempotence=true*

 
{code:java}
2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre
amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable 
error state due to org.apache.kafka.common.errors.InvalidPidMappingException: 
The producer attempted to use a producer id which is not currently assigned to 
its tra
nsactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT
hread-1] task [0_2] Error encountered sending record to topic 
mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception
 during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PE
NDING_SHUTDOWN
2021-09-10T12:21:59.741 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down
2021-09-10T12:21:59.743 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2021-09-10T12:21:59.744 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamTask - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] 
Suspended running
2021-09-10T12:21:59.747 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer,
 groupId=nu
ll] Subscribed to partition(s): mtx-caf-DuplicateCheckStore-changelog-10, 
mtx-caf-DuplicateCheckStore-changelog-6, 
mtx-caf-DuplicateCheckStore-changelog-4, 
mtx-caf-DuplicateCheckStore-changelog-2, 
mtx-caf-AggregatedRecordStore-changelog-10, m
tx-caf-DuplicateCheckStore-changelog-0, 
mtx-caf-AggregatedRecordStore-changelog-4, 
mtx-caf-AggregatedRecordStore-changelog-6, 
mtx-caf-AggregatedRecordStore-changelog-0, 
mtx-caf-AggregatedRecordStore-changelog-2, mtx-caf-DuplicateCheckStore-ch
angelog-11, mtx-caf-DuplicateCheckStore-changelog-9, 
mtx-caf-DuplicateCheckStore-changelog-7, 
mtx-caf-AggregatedRecordStore-changelog-11, 
mtx-caf-DuplicateCheckStore-changelog-5, 
mtx-caf-DuplicateCheckStore-changelog-3, mtx-caf-AggregatedReco
rdStore-changelog-7, mtx-caf-DuplicateCheckStore-changelog-1, 
mtx-caf-AggregatedRecordStore-changelog-9, 
mtx-caf-AggregatedRecordStore-changelog-3, 
mtx-caf-AggregatedRecordStore-changelog-5, 
mtx-caf-AggregatedRecordStore-changelog-1
2021-09-10T12:21:59.753 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] 
Closing record collector 
dirty
...
...
...
...
2021-09-10T12:21:59.763 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamTask - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
Suspended running
2021-09-10T12:21:59.764 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer,
 groupId=null] Subscribed to partition(s): 
mtx-caf-DuplicateCheckStore-changelog-11, 
mtx-caf-DuplicateCheckStore-changelog-10, 
mtx-caf-DuplicateCheckStore-changelog-9, 
mtx-caf-DuplicateCheckStore-changelog-7, 
mtx-caf-DuplicateCheckStore-changelog-6, 
mtx-caf-AggregatedRecordStore-changelog-11, 
mtx-caf-DuplicateCheckStore-changelog-4, 
mtx-caf-AggregatedRecordStore-changelog-7, 
mtx-caf-AggregatedRecordStore-changelog-10, 
mtx-caf-DuplicateCheckStore-changelog-0, 
mtx-caf-AggregatedRecordStore-changelog-9, 
mtx-caf-AggregatedRecordStore-changelog-4, 
mtx-caf-AggregatedRecordStore-changelog-6, 
mtx-caf-AggregatedRecordStore-changelog-0
2021-09-10T12:21:59.765 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
Closing record collector dirty
2021-09-10T12:21:59.765 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.producer.KafkaProducer - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Aborting incomplete transaction
2021-09-10T12:21:59.766 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Invoking InitProducerId with current producer ID 
and epoch (producerId=1005, epoch=16) in order to bump the epoch
2021-09-10T12:21:59.769 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] ProducerId set to 1006 with epoch 0
...
...
...
2021-09-10T12:21:59.798 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.producer.KafkaProducer - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.

{code}
 


was (Author: neeraj.vaidya):
Thanks [~mjsax] for the reference to the KIP. It might not be possible for my 
application to be upgraded soon.

However, I still feel this is a defect because I believed that stream 
processing should continue even when transactional.id.expiration.ms has elapsed 
and should not bring down the entire application.

I noticed from the logs that the application does try to get a new epoch and 
producer Id, but then still suspends and stops all threads. As you will observe 
below towards the end of the log messages, for the task[ 0_2], the call to 
"*Invoking InitProducerId*" is made, but still the flow ends up in the 
UnCaughtExceptionHandler.

I was of the understanding that once the producerId epoch is bumped, stream 
processing should resume.

However, it starts shutting down the entire application by an uncaught 
exception.

*FYI - I am using exactly_once processing and enable.idempotence=true*

 
{code:java}
2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-Stre
amThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable 
error state due to org.apache.kafka.common.errors.InvalidPidMappingException: 
The producer attempted to use a producer id which is not currently assigned to 
its tra
nsactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamT
hread-1] task [0_2] Error encountered sending record to topic 
mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception
 during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PE
NDING_SHUTDOWN
2021-09-10T12:21:59.741 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Shutting down
2021-09-10T12:21:59.743 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2021-09-10T12:21:59.744 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamTask - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] 
Suspended running
2021-09-10T12:21:59.747 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer,
 groupId=nu
ll] Subscribed to partition(s): mtx-caf-DuplicateCheckStore-changelog-10, 
mtx-caf-DuplicateCheckStore-changelog-6, 
mtx-caf-DuplicateCheckStore-changelog-4, 
mtx-caf-DuplicateCheckStore-changelog-2, 
mtx-caf-AggregatedRecordStore-changelog-10, m
tx-caf-DuplicateCheckStore-changelog-0, 
mtx-caf-AggregatedRecordStore-changelog-4, 
mtx-caf-AggregatedRecordStore-changelog-6, 
mtx-caf-AggregatedRecordStore-changelog-0, 
mtx-caf-AggregatedRecordStore-changelog-2, mtx-caf-DuplicateCheckStore-ch
angelog-11, mtx-caf-DuplicateCheckStore-changelog-9, 
mtx-caf-DuplicateCheckStore-changelog-7, 
mtx-caf-AggregatedRecordStore-changelog-11, 
mtx-caf-DuplicateCheckStore-changelog-5, 
mtx-caf-DuplicateCheckStore-changelog-3, mtx-caf-AggregatedReco
rdStore-changelog-7, mtx-caf-DuplicateCheckStore-changelog-1, 
mtx-caf-AggregatedRecordStore-changelog-9, 
mtx-caf-AggregatedRecordStore-changelog-3, 
mtx-caf-AggregatedRecordStore-changelog-5, 
mtx-caf-AggregatedRecordStore-changelog-1
2021-09-10T12:21:59.753 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_8] 
Closing record collector 
dirty
...
...
...
...
2021-09-10T12:21:59.763 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamTask - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
Suspended running
2021-09-10T12:21:59.764 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-restore-consumer,
 groupId=null] Subscribed to partition(s): 
mtx-caf-DuplicateCheckStore-changelog-11, 
mtx-caf-DuplicateCheckStore-changelog-10, 
mtx-caf-DuplicateCheckStore-changelog-9, 
mtx-caf-DuplicateCheckStore-changelog-7, 
mtx-caf-DuplicateCheckStore-changelog-6, 
mtx-caf-AggregatedRecordStore-changelog-11, 
mtx-caf-DuplicateCheckStore-changelog-4, 
mtx-caf-AggregatedRecordStore-changelog-7, 
mtx-caf-AggregatedRecordStore-changelog-10, 
mtx-caf-DuplicateCheckStore-changelog-0, 
mtx-caf-AggregatedRecordStore-changelog-9, 
mtx-caf-AggregatedRecordStore-changelog-4, 
mtx-caf-AggregatedRecordStore-changelog-6, 
mtx-caf-AggregatedRecordStore-changelog-0
2021-09-10T12:21:59.765 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
Closing record collector dirty
2021-09-10T12:21:59.765 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.clients.producer.KafkaProducer - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Aborting incomplete transaction
2021-09-10T12:21:59.766 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Invoking InitProducerId with current producer ID 
and epoch (producerId=1005, epoch=16) in order to bump the epoch
2021-09-10T12:21:59.769 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] ProducerId set to 1006 with epoch 0

{code}
 

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13292
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13292
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: NEERAJ VAIDYA
>            Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 partitions for 
> all topics) get shutdown and this brings down the whole application.
>  
> I understand that the transactional.id.expiration.ms = 7 days (default) will 
> likely cause the application thread from getting expired, but why does this 
> specific thread/task not get fenced or respawned.
> Why shutdown the entire Streams processing application just because one task 
> has been idle ??
>  
> Is there a way to keep my application up and running without causing it to 
> shutdown ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to