chia7712 commented on code in PR #20577:
URL: https://github.com/apache/kafka/pull/20577#discussion_r2376317631


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1067,8 +1067,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
           // ongoing. If the transaction is expected to be ongoing, we will 
not set a VerificationGuard. If the transaction is aborted, 
hasOngoingTransaction is false and
           // requestVerificationGuard is the sentinel, so we will throw an 
error. A subsequent produce request (retry) should create verification state 
and return to phase 1.
-          if (batch.isTransactional && 
!hasOngoingTransaction(batch.producerId) && 
batchMissingRequiredVerification(batch, requestVerificationGuard))
-            throw new InvalidTxnStateException("Record was not part of an 
ongoing transaction")
+          if (batch.isTransactional && 
!hasOngoingTransaction(batch.producerId)) {
+            // Check epoch first: if producer epoch is stale, throw 
recoverable InvalidProducerEpochException.
+            val entry = 
producerStateManager.activeProducers.get(batch.producerId)
+            if (entry != null && batch.producerEpoch < entry.producerEpoch) {

Review Comment:
   I left some comments on the previous PR. Please take a look when you have 
time. Thanks!
   
   https://github.com/apache/kafka/pull/20534#pullrequestreview-3263653976



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to