kirktrue commented on code in PR #17022:
URL: https://github.com/apache/kafka/pull/17022#discussion_r2143291551


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -790,14 +790,34 @@ public synchronized void 
maybeTransitionToErrorState(RuntimeException exception)
     }
 
     synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException 
exception, boolean adjustSequenceNumbers) {
-        maybeTransitionToErrorState(exception);
+        // Compare the batch with the current ProducerIdAndEpoch. If the 
producer IDs are the *same* but the epochs
+        // are *different*, consider the batch as stale. This means this batch 
will not cause a change to either the
+        // state or the sequence numbers; the stale batch is only removed from 
the inflight set.
+        boolean isStaleBatch = batch.producerId() == 
producerIdAndEpoch.producerId && batch.producerEpoch() != 
producerIdAndEpoch.epoch;
+
+        // It's possible the transaction manager is already in the FATAL_ERROR 
state at this point. Depending on the
+        // incoming exception type, maybeTransitionToErrorState() could 
attempt to set the state to ABORTABLE_ERROR.
+        // For example, suppose a fatal error occurred during a transaction, 
and then moments later one of the batches
+        // in that transaction failed with a TimeoutException. 
maybeTransitionToErrorState() would then (blindly)
+        // attempt to transition to ABORTABLE_ERROR, which is invalid and 
would result in an IllegalStateException.
+        //
+        // Therefore, only attempt to transition to the FATAL_ERROR state if 
the batch is "fresh" *and* the
+        // transaction manager is not already in the FATAL_ERROR state.
+        if (!isStaleBatch && !hasFatalError())
+            maybeTransitionToErrorState(exception);
+
         removeInFlightBatch(batch);
 
         if (hasFatalError()) {
             log.debug("Ignoring batch {} with producer id {}, epoch {}, and 
sequence number {} " +
                             "since the producer is already in fatal error 
state", batch, batch.producerId(),
                     batch.producerEpoch(), batch.baseSequence(), exception);
             return;
+        } else if (isStaleBatch) {
+            log.debug("Ignoring stale batch {} with producer id {}, epoch {}, 
and sequence number {} " +

Review Comment:
   That's a good question, @mjsax.
   
   IMO, `TRACE` is only for developers. No organization will set logging to 
`TRACE` in production, at least not long enough to hit this issue and see the 
log. But honestly, the same is true for `DEBUG` 🤔
   
   Can we leave at `DEBUG` and let end users tell us to turn it down?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to