[
https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845497#comment-17845497
]
Justine Olshan commented on KAFKA-16699:
----------------------------------------
Yay yay! I'm happy this is getting fixed :)
> Have Streams treat InvalidPidMappingException like a ProducerFencedException
> ----------------------------------------------------------------------------
>
> Key: KAFKA-16699
> URL: https://issues.apache.org/jira/browse/KAFKA-16699
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Walker Carlson
> Assignee: Walker Carlson
> Priority: Major
>
> KStreams is able to handle the ProducerFenced (among other errors) cleanly.
> It does this by closing the task dirty and triggering a rebalance amongst the
> worker threads to rejoin the group. The producer is also recreated. Due to
> how streams works (writing to and reading from various topics), the
> application is able to figure out the last thing the fenced producer
> completed and continue from there.
> KStreams EOS V2 also trusts that any open transaction (including those whose
> producer is fenced) will be aborted by the server. This is a key factor in
> how it is able to operate. In EOS V1, the new InitProducerId fences and
> aborts the previous transaction. In either case, we are able to reason about
> the last valid state from the fenced producer and how to proceed.
> h2. InvalidPidMappingException ≈ ProducerFenced
> I argue that InvalidPidMappingException can be handled in the same way. Let
> me explain why.
> There are two cases we see this error:
> #
>
> {{txnManager.getTransactionState(transactionalId).flatMap { case None =>
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> #
>
> {{if (txnMetadata.producerId != producerId)
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> h3. Case 1
> We are missing a value in the transactional state map for the transactional
> ID. Under normal operations, this is only possible when the transactional ID
> expires via the mechanism described above after
> {{transactional.id.expiration.ms}} of inactivity. In this case, there is no
> state that needs to be reconciled. It is safe to just rebalance and rejoin
> the group with a new producer. We probably don’t even need to close the task
> dirty, but it doesn’t hurt to do so.
> h3. Case 2
> This is a bit more interesting. It says that we have transactional state, but
> the producer ID in the request does not match the producer ID associated with
> the transactional ID on the broker. How can this happen?
> It is possible that a new producer instance B with the same transactional ID
> was created after the transactional state expired for instance A. Given there
> is no state on the server when B joins, it will get a totally new producer
> ID. If the original producer A comes back, it will have state for this
> transactional ID but the wrong producer ID.
> In this case, the old producer ID is fenced, it’s just the normal epoch-based
> fencing logic doesn’t apply. We can treat it the same however.
> h2. Summary
> As described in the cases above, any time we encounter the InvalidPidMapping
> during normal operation, the previous producer was either finished with its
> operations or was fenced. Thus, it is safe to close the dirty and rebalance +
> rejoin the group just as we do with the ProducerFenced exception.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)