mjsax commented on code in PR #17022: URL: https://github.com/apache/kafka/pull/17022#discussion_r2141309397
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -790,14 +790,34 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception) } synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) { - maybeTransitionToErrorState(exception); + // Compare the batch with the current ProducerIdAndEpoch. If the producer IDs are the *same* but the epochs + // are *different*, consider the batch as stale. This means this batch will not cause a change to either the + // state or the sequence numbers; the stale batch is only removed from the inflight set. + boolean isStaleBatch = batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() != producerIdAndEpoch.epoch; + + // It's possible the transaction manager is already in the FATAL_ERROR state at this point. Depending on the + // incoming exception type, maybeTransitionToErrorState() could attempt to set the state to ABORTABLE_ERROR. + // For example, suppose a fatal error occurred during a transaction, and then moments later one of the batches + // in that transaction failed with a TimeoutException. maybeTransitionToErrorState() would then (blindly) + // attempt to transition to ABORTABLE_ERROR, which is invalid and would result in an IllegalStateException. + // + // Therefore, only attempt to transition to the FATAL_ERROR state if the batch is "fresh" *and* the + // transaction manager is not already in the FATAL_ERROR state. + if (!isStaleBatch && !hasFatalError()) + maybeTransitionToErrorState(exception); + removeInFlightBatch(batch); if (hasFatalError()) { log.debug("Ignoring batch {} with producer id {}, epoch {}, and sequence number {} " + "since the producer is already in fatal error state", batch, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), exception); return; + } else if (isStaleBatch) { + log.debug("Ignoring stale batch {} with producer id {}, epoch {}, and sequence number {} " + Review Comment: Just wonder if this should be TRACE? It seems not to be important enough for DEBUG (not worried about volume, as it should only happen rarely -- just wondering about importance). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org