TaiJuWu commented on code in PR #20635:
URL: https://github.com/apache/kafka/pull/20635#discussion_r2637811939
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -33,4 +45,185 @@ public static LogSegment createSegment(long offset, File
logDir, int indexInterv
// Create and return the LogSegment instance
return new LogSegment(ms, idx, timeIdx, txnIndex, offset,
indexIntervalBytes, 0, time);
}
+
+
+ /**
+ * Append an end transaction marker (commit or abort) to the log as a
leader.
+ *
+ * @param transactionVersion the transaction version (1 = TV1, 2 = TV2)
etc. Must be explicitly specified.
+ * TV2 markers require strict epoch validation
(markerEpoch > currentEpoch),
+ * while legacy markers use relaxed validation
(markerEpoch >= currentEpoch).
+ */
+ public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log,
+ long producerId,
+ short producerEpoch,
+ ControlRecordType
controlType,
+ long timestamp,
+ int
coordinatorEpoch,
+ int leaderEpoch,
+ short
transactionVersion) {
+ MemoryRecords records = endTxnRecords(controlType, producerId,
producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp);
+
+ return log.appendAsLeader(records, leaderEpoch,
AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL,
transactionVersion);
+ }
+
+ public static MemoryRecords endTxnRecords(ControlRecordType
controlRecordType,
+ long producerId,
+ short epoch,
+ long offset,
+ int coordinatorEpoch,
+ int partitionLeaderEpoch,
+ long timestamp) {
+ EndTransactionMarker marker = new
EndTransactionMarker(controlRecordType, coordinatorEpoch);
+ return MemoryRecords.withEndTransactionMarker(offset, timestamp,
partitionLeaderEpoch, producerId, epoch, marker);
+ }
+
+ @SuppressWarnings("ParameterNumber")
+ public static UnifiedLog createLog(File dir,
+ LogConfig config,
+ BrokerTopicStats brokerTopicStats,
+ Scheduler scheduler,
+ Time time,
+ long logStartOffset,
+ long recoveryPoint,
+ int maxTransactionTimeoutMs,
+ ProducerStateManagerConfig
producerStateManagerConfig,
+ int producerIdExpirationCheckIntervalMs,
+ boolean lastShutdownClean,
+ Optional<Uuid> topicId,
+ ConcurrentMap<String, Integer>
numRemainingSegments,
+ boolean remoteStorageSystemEnable,
+ LogOffsetsListener logOffsetsListener)
throws IOException {
+ return UnifiedLog.create(
+ dir,
+ config,
+ logStartOffset,
+ recoveryPoint,
+ scheduler,
+ brokerTopicStats,
+ time,
+ maxTransactionTimeoutMs,
+ producerStateManagerConfig,
+ producerIdExpirationCheckIntervalMs,
+ new LogDirFailureChannel(10),
+ lastShutdownClean,
+ topicId,
+ numRemainingSegments,
+ remoteStorageSystemEnable,
+ logOffsetsListener
+ );
+ }
+
+ public static class LogConfigBuilder {
Review Comment:
Thanks point out, it actually simply the logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]