mjsax commented on code in PR #17022: URL: https://github.com/apache/kafka/pull/17022#discussion_r2048203504
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -772,6 +779,14 @@ synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException except } } + /** + * Returns {@code true} if the given {@link ProducerBatch} has the same producer ID but a different epoch than the + * {@link #producerIdAndEpoch cached producer ID and epoch}. + */ + synchronized boolean isStaleBatch(ProducerBatch batch) { Review Comment: Does this method need to by `synchronized`? Also seems it could be `private`? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -737,14 +737,21 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception) } synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) { - maybeTransitionToErrorState(exception); + if (!isStaleBatch(batch) && !hasFatalError()) Review Comment: Not sure if I understand the `!hasFatalError()` condition. Can you elaborate? -- I thought we want to call `maybeTransitionToErrorState(exception);` for any non-stale batch, independent of the current error state? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org