[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584462#comment-16584462 ]
Jason Gustafson commented on KAFKA-7190: ---------------------------------------- Sorry, this will be lengthy comment because this is a surprisingly complex issue. I've been giving this some more thought since this problem keeps recurring. In fact, the way the producer works currently is close to what I suggested above. When the producer receives an UNKNOWN_PRODUCER error, it attempts to reset the sequence number if it believes it is safe to do so. This is actually problematic for the reasons I mentioned above. Whenever we reuse a sequence number, we are violating our uniqueness assumption which means some guarantees go out the door (at least theoretically). In other words, I think the current workaround was just a bad idea. The approach suggested here is actually safer for the common case. However, the main drawback is that we lose the consistency between the cached producer state and the state in the log. In the worst case, if we have to rebuild producer state using the log, then we will lose some of the producers, which puts us back in the position of handling UNKNOWN_PRODUCER errors in the clients. For example, this would happen upon partition reassignment since the new replica will reload producer state using the log. This can cause surprising behavior in some cases. For example, a producer which is fenced by the cached state in one leader may become unfenced after changing to a reassigned leader which rebuilt using only the log. Alternatively, a valid sequence number of the leader may become invalid after a leader failover. I think the basic flaw here is that we allow the monotonicity of producer writes to be violated in two cases. In the first case, we violate it when we reset the sequence number after receiving an UNKNOWN_PRODUCER error. In the second case, we violate it because our fencing cannot protect us when we don't have producer state. Understanding the problem at least suggests possible solutions. Here is what I'm thinking: 1. We need to add fencing for the first transactional write from a producer. Basically I think we need a new inter-broker API, say CheckProducerFenced, which can verify whether an epoch is correct when there is no local state that can be relied upon. 2. When we encounter an UNKNOWN_PRODUCER error in the client, we need a safe way to bump the epoch in order to continue. We can update the InitProducerId API to include an optional epoch. When the transaction coordinator receives the request, it can verify that the epoch matches the current epoch before incrementing it. That way the producer will not mistakenly fence another producer. 3. If we receive UNKNOWN_PRODUCER and we are in a transaction, we should probably just abort. After aborting, we can bump the epoch and safely continue. 4. For the idempotent producer, if we get UNKNOWN_PRODUCER, it should be safe to bump the epoch locally because the producer id is guaranteed to be unique. 5. Once we have these fixes, the submitted patch becomes more attractive. We can keep producers in the cache longer than their state exists in the log. We may still get an unexpected UNKNOWN_PRODUCER error due to the possible inconsistency between the leader and follower (e.g. as a result of reassignment), but it should be a rare case and we can always abort the transaction, bump the epoch, and continue. In any case, our guarantees will not be violated. This is a pretty high level, but I'm happy to flesh out the details in a KIP. However, I probably don't have the time to implement it. [~lambdaliu] If you think this is a good idea, perhaps you'd be willing to help out? > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > --------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams > Affects Versions: 1.1.0, 1.1.1 > Reporter: Bill Bejeck > Assignee: lambdaliu > Priority: Major > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)