[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-06-26 Thread Odin Standal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523343#comment-16523343
 ] 

Odin Standal commented on KAFKA-6817:
-

Would it be possible to change transactional.id.expiration.ms to a long?

While not a proper solution to the problem, at least that would enable us to 
reprocess old messages, which would greatly increase the value of Kafka for us.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-26 Thread Odin Standal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453593#comment-16453593
 ] 

Odin Standal commented on KAFKA-6817:
-

Thanks for following up [~apurva]. Being able to use the Kafka record timestamp 
as domain time is very useful, so we hope this gets fixed. 

As a workaround, for now we are rewriting our application to keep track of the 
domain time ourselves inside the record payload instead of using the Kafka 
record timestamp. However, this complicates our code as we now have to make 
sure the domain time is propagated whenever we transform records.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-25 Thread Odin Standal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451764#comment-16451764
 ] 

Odin Standal commented on KAFKA-6817:
-

[~apurva] Yes we tried that, only to realize the transactional.id.expiration.ms 
is just an int, which means it would max be 24 days. Additionally, it would 
represent a potential memory leak since the broker would then hold on to 
producer-metadata for a very long time.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-23 Thread Odin Standal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Odin Standal updated KAFKA-6817:

Description: 
We are seeing the following exception in our Kafka application: 
{code:java}
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due 
to the following error: org.apache.kafka.streams.errors.StreamsException: task 
[0_0] Abort sending since an error caught with a previous record (key 22 
value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 
due to This exception is raised by the broker if it could not locate the 
producer metadata associated with the producerId in question. This could happen 
if, for instance, the producer's records were deleted because their retention 
time had elapsed. Once the last records of the producerId are removed, the 
producer's metadata is removed from the broker, and future appends by the 
producer will return this exception. at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.errors.UnknownProducerIdException

{code}
We discovered this error when we had the need to reprocess old messages. See 
more details on 
[Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]

We have reproduced the error with a smaller example application. The error 
occurs after 10 minutes of producing messages that have old timestamps (type 1 
year old). The topic we are writing to has a retention.ms set to 1 year so we 
are expecting the messages to stay there.

After digging through the ProducerStateManager-code in the Kafka source code we 
have a theory of what might be wrong.

The ProducerStateManager.removeExpiredProducers() seems to remove producers 
from memory erroneously when processing records which are older than the 
maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
configuration), which is set by default to 7 days. 

  was:
We are seeing the following exception in our Kafka application:
{code:java}
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due 
to the following error: org.apache.kafka.streams.errors.StreamsException: task 
[0_0] Abort sending since an error caught with a previous record (key 22 
value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 
due to This exception is raised by the broker if it could not locate the 
producer metadata associated with the producerId in question. This could happen 
if, for instance, the producer's records were deleted because their retention 
time had elapsed. Once the last records of the producerId are removed, the 
producer's metadata is removed from the broker, and future appends by the 
producer will return this exception. at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
 at 

[jira] [Updated] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-23 Thread Odin Standal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Odin Standal updated KAFKA-6817:

Description: 
We are seeing the following exception in our Kafka application:
{code:java}
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due 
to the following error: org.apache.kafka.streams.errors.StreamsException: task 
[0_0] Abort sending since an error caught with a previous record (key 22 
value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 
due to This exception is raised by the broker if it could not locate the 
producer metadata associated with the producerId in question. This could happen 
if, for instance, the producer's records were deleted because their retention 
time had elapsed. Once the last records of the producerId are removed, the 
producer's metadata is removed from the broker, and future appends by the 
producer will return this exception. at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.errors.UnknownProducerIdException

{code}
We discovered this error when we had the need to reprocess old messages.

We have reproduced the error with a smaller example application. The error 
occurs after 10 minutes of producing messages that have old timestamps (type 1 
year old). The topic we are writing to has a retention.ms set to 1 year so we 
are expecting the messages to stay there.

After digging through the ProducerStateManager-code in the Kafka source code we 
have a theory of what might be wrong.

The ProducerStateManager.removeExpiredProducers() seems to remove producers 
from memory erroneously when processing records which are older than the 
maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
configuration), which is set by default to 7 days. 

 

  was:
We are seeing the following exception in our Kafka application:
{code:java}
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due 
to the following error: org.apache.kafka.streams.errors.StreamsException: task 
[0_0] Abort sending since an error caught with a previous record (key 22 
value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 
due to This exception is raised by the broker if it could not locate the 
producer metadata associated with the producerId in question. This could happen 
if, for instance, the producer's records were deleted because their retention 
time had elapsed. Once the last records of the producerId are removed, the 
producer's metadata is removed from the broker, and future appends by the 
producer will return this exception. at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 

[jira] [Created] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-23 Thread Odin Standal (JIRA)
Odin Standal created KAFKA-6817:
---

 Summary: UnknownProducerIdException when writing messages with old 
timestamps
 Key: KAFKA-6817
 URL: https://issues.apache.org/jira/browse/KAFKA-6817
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.1.0
Reporter: Odin Standal


We are seeing the following exception in our Kafka application:
{code:java}
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due 
to the following error: org.apache.kafka.streams.errors.StreamsException: task 
[0_0] Abort sending since an error caught with a previous record (key 22 
value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 
due to This exception is raised by the broker if it could not locate the 
producer metadata associated with the producerId in question. This could happen 
if, for instance, the producer's records were deleted because their retention 
time had elapsed. Once the last records of the producerId are removed, the 
producer's metadata is removed from the broker, and future appends by the 
producer will return this exception. at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.errors.UnknownProducerIdException

{code}
We discovered this error when we had the need to reprocess old messages.

We have reproduced the error with a smaller example application. The error 
occurs after 10 minutes of producing messages that have old timestamps (type 1 
year old). The topic we are writing to have a retention.ms set to 1 year so we 
are expecting the messages to stay there.

After digging through the ProducerStateManager-code in the Kafka source code we 
have a theory of what might be wrong.

The ProducerStateManager.removeExpiredProducers() seems to remove producers 
from memory erroneously when processing records which are older than the 
maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
configuration), which is set by default to 7 days. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)