kirktrue commented on code in PR #17022: URL: https://github.com/apache/kafka/pull/17022#discussion_r2143291551
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -790,14 +790,34 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception) } synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) { - maybeTransitionToErrorState(exception); + // Compare the batch with the current ProducerIdAndEpoch. If the producer IDs are the *same* but the epochs + // are *different*, consider the batch as stale. This means this batch will not cause a change to either the + // state or the sequence numbers; the stale batch is only removed from the inflight set. + boolean isStaleBatch = batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() != producerIdAndEpoch.epoch; + + // It's possible the transaction manager is already in the FATAL_ERROR state at this point. Depending on the + // incoming exception type, maybeTransitionToErrorState() could attempt to set the state to ABORTABLE_ERROR. + // For example, suppose a fatal error occurred during a transaction, and then moments later one of the batches + // in that transaction failed with a TimeoutException. maybeTransitionToErrorState() would then (blindly) + // attempt to transition to ABORTABLE_ERROR, which is invalid and would result in an IllegalStateException. + // + // Therefore, only attempt to transition to the FATAL_ERROR state if the batch is "fresh" *and* the + // transaction manager is not already in the FATAL_ERROR state. + if (!isStaleBatch && !hasFatalError()) + maybeTransitionToErrorState(exception); + removeInFlightBatch(batch); if (hasFatalError()) { log.debug("Ignoring batch {} with producer id {}, epoch {}, and sequence number {} " + "since the producer is already in fatal error state", batch, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), exception); return; + } else if (isStaleBatch) { + log.debug("Ignoring stale batch {} with producer id {}, epoch {}, and sequence number {} " + Review Comment: That's a good question, @mjsax. IMO, `TRACE` is only for developers. No organization will set logging to `TRACE` in production, at least not long enough to hit this issue and see the log. But honestly, the same is true for `DEBUG` 🤔 Can we leave at `DEBUG` and let end users tell us to turn it down? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org