[ 
https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-16699:
-----------------------------------
    Description: 
KStreams is able to handle the ProducerFenced (among other errors) cleanly. It 
does this by closing the task dirty and triggering a rebalance amongst the 
worker threads to rejoin the group. The producer is also recreated. Due to how 
streams works (writing to and reading from various topics), the application is 
able to figure out the last thing the fenced producer completed and continue 
from there.

KStreams EOS V2 also trusts that any open transaction (including those whose 
producer is fenced) will be aborted by the server. This is a key factor in how 
it is able to operate. In EOS V1, the new InitProducerId fences and aborts the 
previous transaction. In either case, we are able to reason about the last 
valid state from the fenced producer and how to proceed.
h2. InvalidPidMappingException ≈ ProducerFenced

I argue that InvalidPidMappingException can be handled in the same way. Let me 
explain why.

There are two cases we see this error:


 # 
 
 {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
 # 
 
 {{if (txnMetadata.producerId != producerId)  
Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}

h3. Case 1

We are missing a value in the transactional state map for the transactional ID. 
Under normal operations, this is only possible when the transactional ID 
expires via the mechanism described above after 
{{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
state that needs to be reconciled. It is safe to just rebalance and rejoin the 
group with a new producer. We probably don’t even need to close the task dirty, 
but it doesn’t hurt to do so.
h3. Case 2

This is a bit more interesting. It says that we have transactional state, but 
the producer ID in the request does not match the producer ID associated with 
the transactional ID on the broker. How can this happen?

It is possible that a new producer instance B with the same transactional ID 
was created after the transactional state expired for instance A. Given there 
is no state on the server when B joins, it will get a totally new producer ID. 
If the original producer A comes back, it will have state for this 
transactional ID but the wrong producer ID.

In this case, the old producer ID is fenced, it’s just the normal epoch-based 
fencing logic doesn’t apply. We can treat it the same however.
h2. Summary

As described in the cases above, any time we encounter the InvalidPidMapping 
during normal operation, the previous producer was either finished with its 
operations or was fenced. Thus, it is safe to close the dirty and rebalance + 
rejoin the group just as we do with the ProducerFenced exception.

> Have Streams treat InvalidPidMappingException like a ProducerFencedException
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-16699
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16699
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Walker Carlson
>            Priority: Major
>
> KStreams is able to handle the ProducerFenced (among other errors) cleanly. 
> It does this by closing the task dirty and triggering a rebalance amongst the 
> worker threads to rejoin the group. The producer is also recreated. Due to 
> how streams works (writing to and reading from various topics), the 
> application is able to figure out the last thing the fenced producer 
> completed and continue from there.
> KStreams EOS V2 also trusts that any open transaction (including those whose 
> producer is fenced) will be aborted by the server. This is a key factor in 
> how it is able to operate. In EOS V1, the new InitProducerId fences and 
> aborts the previous transaction. In either case, we are able to reason about 
> the last valid state from the fenced producer and how to proceed.
> h2. InvalidPidMappingException ≈ ProducerFenced
> I argue that InvalidPidMappingException can be handled in the same way. Let 
> me explain why.
> There are two cases we see this error:
>  # 
>  
>  {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
>  # 
>  
>  {{if (txnMetadata.producerId != producerId)  
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> h3. Case 1
> We are missing a value in the transactional state map for the transactional 
> ID. Under normal operations, this is only possible when the transactional ID 
> expires via the mechanism described above after 
> {{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
> state that needs to be reconciled. It is safe to just rebalance and rejoin 
> the group with a new producer. We probably don’t even need to close the task 
> dirty, but it doesn’t hurt to do so.
> h3. Case 2
> This is a bit more interesting. It says that we have transactional state, but 
> the producer ID in the request does not match the producer ID associated with 
> the transactional ID on the broker. How can this happen?
> It is possible that a new producer instance B with the same transactional ID 
> was created after the transactional state expired for instance A. Given there 
> is no state on the server when B joins, it will get a totally new producer 
> ID. If the original producer A comes back, it will have state for this 
> transactional ID but the wrong producer ID.
> In this case, the old producer ID is fenced, it’s just the normal epoch-based 
> fencing logic doesn’t apply. We can treat it the same however.
> h2. Summary
> As described in the cases above, any time we encounter the InvalidPidMapping 
> during normal operation, the previous producer was either finished with its 
> operations or was fenced. Thus, it is safe to close the dirty and rebalance + 
> rejoin the group just as we do with the ProducerFenced exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to