John Roesler created KAFKA-9310:
-----------------------------------

             Summary: StreamThread may die from recoverable UnknownProducerId 
exception
                 Key: KAFKA-9310
                 URL: https://issues.apache.org/jira/browse/KAFKA-9310
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.4.0
            Reporter: John Roesler


We attempted to capture and recover from UnknownProducerId exceptions in 
KAFKA-9231 , but the exception can still be raised, wrapped in a 
KafkaException, and kill the thread.

For example, see the stack trace:
{noformat}
[2019-12-17 00:08:53,064] ERROR 
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
stream-thread 
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
Encountered the following unexpected Kafka exception during processing, this 
usually indicate Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
  org.apache.kafka.streams.errors.StreamsException: Exception caught in 
process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031, 
topic=windowed-node-counts, partition=1, offset=354933575, 
stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
sending since an error caught with a previous record (timestamp 1575857317197) 
to topic stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog 
due to org.apache.kafka.common.KafkaException: Cannot perform send because at 
least one previous transactional or idempotent request has failed with errors.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
        at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
        at 
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
        at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because 
at least one previous transactional or idempotent request has failed with 
errors.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
        ... 29 more
 Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
exception is raised by the broker if it could not locate the producer metadata 
associated with the producerId in question. This could happen if, for instance, 
the producer's records were deleted because their retention time had elapsed. 
Once the last records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will return this 
exception.

        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
        at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
sending since an error caught with a previous record (timestamp 1575857317197) 
to topic stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog 
due to org.apache.kafka.common.KafkaException: Cannot perform send because at 
least one previous transactional or idempotent request has failed with errors.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
        at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
        at 
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
        ... 5 more
 Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because 
at least one previous transactional or idempotent request has failed with 
errors.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
        ... 29 more
 Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
exception is raised by the broker if it could not locate the producer metadata 
associated with the producerId in question. This could happen if, for instance, 
the producer's records were deleted because their retention time had elapsed. 
Once the last records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will return this 
exception.
 [2019-12-17 00:08:53,066] INFO 
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
stream-thread 
[stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State 
transition from RUNNING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread)

{noformat}

The catch blocks should be updated to expect the exception in this form.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to