[ 
https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845497#comment-17845497
 ] 

Justine Olshan commented on KAFKA-16699:
----------------------------------------

Yay yay! I'm happy this is getting fixed :) 

> Have Streams treat InvalidPidMappingException like a ProducerFencedException
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-16699
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16699
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Walker Carlson
>            Assignee: Walker Carlson
>            Priority: Major
>
> KStreams is able to handle the ProducerFenced (among other errors) cleanly. 
> It does this by closing the task dirty and triggering a rebalance amongst the 
> worker threads to rejoin the group. The producer is also recreated. Due to 
> how streams works (writing to and reading from various topics), the 
> application is able to figure out the last thing the fenced producer 
> completed and continue from there.
> KStreams EOS V2 also trusts that any open transaction (including those whose 
> producer is fenced) will be aborted by the server. This is a key factor in 
> how it is able to operate. In EOS V1, the new InitProducerId fences and 
> aborts the previous transaction. In either case, we are able to reason about 
> the last valid state from the fenced producer and how to proceed.
> h2. InvalidPidMappingException ≈ ProducerFenced
> I argue that InvalidPidMappingException can be handled in the same way. Let 
> me explain why.
> There are two cases we see this error:
>  # 
>  
>  {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
>  # 
>  
>  {{if (txnMetadata.producerId != producerId)  
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> h3. Case 1
> We are missing a value in the transactional state map for the transactional 
> ID. Under normal operations, this is only possible when the transactional ID 
> expires via the mechanism described above after 
> {{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
> state that needs to be reconciled. It is safe to just rebalance and rejoin 
> the group with a new producer. We probably don’t even need to close the task 
> dirty, but it doesn’t hurt to do so.
> h3. Case 2
> This is a bit more interesting. It says that we have transactional state, but 
> the producer ID in the request does not match the producer ID associated with 
> the transactional ID on the broker. How can this happen?
> It is possible that a new producer instance B with the same transactional ID 
> was created after the transactional state expired for instance A. Given there 
> is no state on the server when B joins, it will get a totally new producer 
> ID. If the original producer A comes back, it will have state for this 
> transactional ID but the wrong producer ID.
> In this case, the old producer ID is fenced, it’s just the normal epoch-based 
> fencing logic doesn’t apply. We can treat it the same however.
> h2. Summary
> As described in the cases above, any time we encounter the InvalidPidMapping 
> during normal operation, the previous producer was either finished with its 
> operations or was fenced. Thus, it is safe to close the dirty and rebalance + 
> rejoin the group just as we do with the ProducerFenced exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to