[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584462#comment-16584462
 ] 

Jason Gustafson commented on KAFKA-7190:
----------------------------------------

Sorry, this will be lengthy comment because this is a surprisingly complex 
issue.

I've been giving this some more thought since this problem keeps recurring. In 
fact, the way the producer works currently is close to what I suggested above. 
When the producer receives an UNKNOWN_PRODUCER error, it attempts to reset the 
sequence number if it believes it is safe to do so. This is actually 
problematic for the reasons I mentioned above. Whenever we reuse a sequence 
number, we are violating our uniqueness assumption which means some guarantees 
go out the door (at least theoretically). In other words, I think the current 
workaround was just a bad idea.

The approach suggested here is actually safer for the common case. However, the 
main drawback is that we lose the consistency between the cached producer state 
and the state in the log. In the worst case, if we have to rebuild producer 
state using the log, then we will lose some of the producers, which puts us 
back in the position of handling UNKNOWN_PRODUCER errors in the clients. For 
example, this would happen upon partition reassignment since the new replica 
will reload producer state using the log. This can cause surprising behavior in 
some cases. For example, a producer which is fenced by the cached state in one 
leader may become unfenced after changing to a reassigned leader which rebuilt 
using only the log. Alternatively, a valid sequence number of the leader may 
become invalid after a leader failover.

I think the basic flaw here is that we allow the monotonicity of producer 
writes to be violated in two cases. In the first case, we violate it when we 
reset the sequence number after receiving an UNKNOWN_PRODUCER error. In the 
second case, we violate it because our fencing cannot protect us when we don't 
have producer state.

Understanding the problem at least suggests possible solutions. Here is what 
I'm thinking:

1. We need to add fencing for the first transactional write from a producer. 
Basically I think we need a new inter-broker API, say CheckProducerFenced, 
which can verify whether an epoch is correct when there is no local state that 
can be relied upon. 
2. When we encounter an UNKNOWN_PRODUCER error in the client, we need a safe 
way to bump the epoch in order to continue. We can update the InitProducerId 
API to include an optional epoch. When the transaction coordinator receives the 
request, it can verify that the epoch matches the current epoch before 
incrementing it. That way the producer will not mistakenly fence another 
producer.
3. If we receive UNKNOWN_PRODUCER and we are in a transaction, we should 
probably just abort. After aborting, we can bump the epoch and safely continue.
4. For the idempotent producer, if we get UNKNOWN_PRODUCER, it should be safe 
to bump the epoch locally because the producer id is guaranteed to be unique.
5. Once we have these fixes, the submitted patch becomes more attractive. We 
can keep producers in the cache longer than their state exists in the log. We 
may still get an unexpected UNKNOWN_PRODUCER error due to the possible 
inconsistency between the leader and follower (e.g. as a result of 
reassignment), but it should be a rare case and we can always abort the 
transaction, bump the epoch, and continue. In any case, our guarantees will not 
be violated.

This is a pretty high level, but I'm happy to flesh out the details in a KIP. 
However, I probably don't have the time to implement it. [~lambdaliu] If you 
think this is a good idea, perhaps you'd be willing to help out?

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7190
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7190
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core, streams
>    Affects Versions: 1.1.0, 1.1.1
>            Reporter: Bill Bejeck
>            Assignee: lambdaliu
>            Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to