mjsax commented on code in PR #17022:
URL: https://github.com/apache/kafka/pull/17022#discussion_r2141309397


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -790,14 +790,34 @@ public synchronized void 
maybeTransitionToErrorState(RuntimeException exception)
     }
 
     synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException 
exception, boolean adjustSequenceNumbers) {
-        maybeTransitionToErrorState(exception);
+        // Compare the batch with the current ProducerIdAndEpoch. If the 
producer IDs are the *same* but the epochs
+        // are *different*, consider the batch as stale. This means this batch 
will not cause a change to either the
+        // state or the sequence numbers; the stale batch is only removed from 
the inflight set.
+        boolean isStaleBatch = batch.producerId() == 
producerIdAndEpoch.producerId && batch.producerEpoch() != 
producerIdAndEpoch.epoch;
+
+        // It's possible the transaction manager is already in the FATAL_ERROR 
state at this point. Depending on the
+        // incoming exception type, maybeTransitionToErrorState() could 
attempt to set the state to ABORTABLE_ERROR.
+        // For example, suppose a fatal error occurred during a transaction, 
and then moments later one of the batches
+        // in that transaction failed with a TimeoutException. 
maybeTransitionToErrorState() would then (blindly)
+        // attempt to transition to ABORTABLE_ERROR, which is invalid and 
would result in an IllegalStateException.
+        //
+        // Therefore, only attempt to transition to the FATAL_ERROR state if 
the batch is "fresh" *and* the
+        // transaction manager is not already in the FATAL_ERROR state.
+        if (!isStaleBatch && !hasFatalError())
+            maybeTransitionToErrorState(exception);
+
         removeInFlightBatch(batch);
 
         if (hasFatalError()) {
             log.debug("Ignoring batch {} with producer id {}, epoch {}, and 
sequence number {} " +
                             "since the producer is already in fatal error 
state", batch, batch.producerId(),
                     batch.producerEpoch(), batch.baseSequence(), exception);
             return;
+        } else if (isStaleBatch) {
+            log.debug("Ignoring stale batch {} with producer id {}, epoch {}, 
and sequence number {} " +

Review Comment:
   Just wonder if this should be TRACE? It seems not to be important enough for 
DEBUG (not worried about volume, as it should only happen rarely -- just 
wondering about importance).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to