jolshan commented on code in PR #20882:
URL: https://github.com/apache/kafka/pull/20882#discussion_r2534908527


##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3708,22 +3722,161 @@ class UnifiedLogTest {
     buffer.flip()
     log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch)
 
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 2, 1)
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 2, 1)
+    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+      coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
+    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+      coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
     assertThrows(classOf[TransactionCoordinatorFencedException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1, 1))
+      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+        coordinatorEpoch = 1, leaderEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel()))
+  }
+
+  @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with 
transactionVersion={0}")
+  @ValueSource(shorts = Array(1, 2))
+  def testEndTxnWithFencedProducerEpoch(transactionVersion: Short): Unit = {
+    val producerId = 1L
+    val epoch = 5.toShort
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
+    val log = createLog(logDir, logConfig)
+    
+    // First, write some transactional records to establish the current epoch
+    val records = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, epoch, 0,
+      new SimpleRecord("key".getBytes, "value".getBytes)
+    )
+    log.appendAsLeader(records, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion)
+    
+    // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1 and 
TV2
+    // TV0/TV1: markerEpoch < currentEpoch is rejected
+    // TV2: markerEpoch <= currentEpoch is rejected (requires strict >)
+    assertThrows(classOf[InvalidProducerEpochException],
+      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, (epoch - 
1).toShort, 
+        ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 
1, 
+        leaderEpoch = 0, transactionVersion = transactionVersion))
+    
+    // Test 2: Same epoch behavior differs between TV0/TV1 and TV2
+    // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch)
+    // TV2: same epoch is rejected (requires strict >, markerEpoch > 
currentEpoch)
+    if (transactionVersion >= 2) {
+      // TV2: same epoch should be rejected
+      assertThrows(classOf[InvalidProducerEpochException],
+        () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, 
+          ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 
1, 
+          leaderEpoch = 0, transactionVersion = transactionVersion))
+    } else {
+      // TV0/TV1: same epoch should be allowed
+      assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log, 
producerId, epoch, 
+        ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 
1, 
+        leaderEpoch = 0, transactionVersion = transactionVersion))
+    }
   }
 
   @Test
-  def testEndTxnWithFencedProducerEpoch(): Unit = {
+  def testTV2MarkerWithBumpedEpochSucceeds(): Unit = {
+    // Test that TV2 markers with bumped epochs (epoch + 1) are accepted 
(positive case)
+    // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so 
markerEpoch = currentEpoch + 1
+    val transactionVersion: Short = 2
     val producerId = 1L
     val epoch = 5.toShort
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
     val log = createLog(logDir, logConfig)
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1)
+    
+    // First, write some transactional records to establish the current epoch
+    val records = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, epoch, 0,
+      new SimpleRecord("key".getBytes, "value".getBytes)
+    )
+    log.appendAsLeader(records, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion)
+    
+    // TV2: Verify that bumped epoch (epoch + 1) is accepted
+    val bumpedEpoch = (epoch + 1).toShort
+    assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log, 
producerId, bumpedEpoch,
+      ControlRecordType.COMMIT, mockTime.milliseconds(), coordinatorEpoch = 1,
+      leaderEpoch = 0, transactionVersion = 
TransactionVersion.TV_2.featureLevel()))
+    
+    // Verify the marker was successfully appended by checking producer state
+    val producerState = 
log.producerStateManager.activeProducers.get(producerId)
+    assertNotNull(producerState)
+    // After a commit marker, the producer epoch should be updated to the 
bumped epoch for TV2
+    assertEquals(bumpedEpoch, producerState.producerEpoch)
+  }
 
-    assertThrows(classOf[InvalidProducerEpochException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, (epoch - 
1).toShort, ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch 
= 1))
+  @Test
+  def testReplicationWithTVUnknownAllowed(): Unit = {

Review Comment:
   Do we also have a test for using unknown for the leader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to