mjsax commented on code in PR #17022:
URL: https://github.com/apache/kafka/pull/17022#discussion_r2048203504


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -772,6 +779,14 @@ synchronized void handleFailedBatch(ProducerBatch batch, 
RuntimeException except
         }
     }
 
+    /**
+     * Returns {@code true} if the given {@link ProducerBatch} has the same 
producer ID but a different epoch than the
+     * {@link #producerIdAndEpoch cached producer ID and epoch}.
+     */
+    synchronized boolean isStaleBatch(ProducerBatch batch) {

Review Comment:
   Does this method need to by `synchronized`?
   
   Also seems it could be `private`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -737,14 +737,21 @@ public synchronized void 
maybeTransitionToErrorState(RuntimeException exception)
     }
 
     synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException 
exception, boolean adjustSequenceNumbers) {
-        maybeTransitionToErrorState(exception);
+        if (!isStaleBatch(batch) && !hasFatalError())

Review Comment:
   Not sure if I understand the `!hasFatalError()` condition. Can you 
elaborate? -- I thought we want to call 
`maybeTransitionToErrorState(exception);` for any non-stale batch, independent 
of the current error state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to