TaiJuWu commented on code in PR #20635:
URL: https://github.com/apache/kafka/pull/20635#discussion_r2642099544


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -33,4 +45,155 @@ public static LogSegment createSegment(long offset, File 
logDir, int indexInterv
         // Create and return the LogSegment instance
         return new LogSegment(ms, idx, timeIdx, txnIndex, offset, 
indexIntervalBytes, 0, time);
     }
+
+
+    /**
+     * Append an end transaction marker (commit or abort) to the log as a 
leader.
+     *
+     * @param transactionVersion the transaction version (1 = TV1, 2 = TV2) 
etc. Must be explicitly specified.
+     *                          TV2 markers require strict epoch validation 
(markerEpoch > currentEpoch),
+     *                          while legacy markers use relaxed validation 
(markerEpoch >= currentEpoch).
+     */
+    public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log,
+                                                           long producerId,
+                                                           short producerEpoch,
+                                                           ControlRecordType 
controlType,
+                                                           long timestamp,
+                                                           int 
coordinatorEpoch,
+                                                           int leaderEpoch,
+                                                           short 
transactionVersion) {
+        MemoryRecords records = endTxnRecords(controlType, producerId, 
producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp);
+
+        return log.appendAsLeader(records, leaderEpoch, 
AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
transactionVersion);
+    }
+
+    public static MemoryRecords endTxnRecords(ControlRecordType 
controlRecordType,
+                                              long producerId,
+                                              short epoch,
+                                              long offset,
+                                              int coordinatorEpoch,
+                                              int partitionLeaderEpoch,
+                                              long timestamp) {
+        EndTransactionMarker marker = new 
EndTransactionMarker(controlRecordType, coordinatorEpoch);
+        return MemoryRecords.withEndTransactionMarker(offset, timestamp, 
partitionLeaderEpoch, producerId, epoch, marker);
+    }
+
+    @SuppressWarnings("ParameterNumber")
+    public static UnifiedLog createLog(File dir,

Review Comment:
   remove this method and use UnifiedLog.create() directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to