[
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528007#comment-16528007
]
Collin Scangarella commented on KAFKA-6817:
-------------------------------------------
Is there a way to update the offset when using the ValueTransformer workaround?
Every time we start up the stream we're finding that we are reprocessing
expired messages.
> UnknownProducerIdException when writing messages with old timestamps
> --------------------------------------------------------------------
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 1.1.0
> Reporter: Odin Standal
> Priority: Major
>
> We are seeing the following exception in our Kafka application:
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer
> due to the following error: org.apache.kafka.streams.errors.StreamsException:
> task [0_0] Abort sending since an error caught with a previous record (key
> 222222 value some-value timestamp 1519200902670) to topic
> exactly-once-test-topic- v2 due to This exception is raised by the broker if
> it could not locate the producer metadata associated with the producerId in
> question. This could happen if, for instance, the producer's records were
> deleted because their retention time had elapsed. Once the last records of
> the producerId are removed, the producer's metadata is removed from the
> broker, and future appends by the producer will return this exception. at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
> at
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
> at
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
> at
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See
> more details on
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error
> occurs after 10 minutes of producing messages that have old timestamps (type
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers
> from memory erroneously when processing records which are older than the
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms`
> configuration), which is set by default to 7 days.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)