hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509620795
########## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ########## @@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging { ) } - class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") { + class RaftWorkloadGenerator( + client: KafkaRaftClient[Array[Byte]], + time: Time, + brokerId: Int, + recordsPerSec: Int, + recordSize: Int + ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] { + + private val stats = new WriteStats(time, printIntervalMs = 5000) + private val payload = new Array[Byte](recordSize) + private val pendingAppends = new util.ArrayDeque[PendingAppend]() + + private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0) + private var isLeader = false + private var throttler: ThroughputThrottler = _ + private var recordCount = 0 + + override def doWork(): Unit = { + if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) { + latestLeaderAndEpoch = client.currentLeaderAndEpoch() + isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId + if (isLeader) { + pendingAppends.clear() + throttler = new ThroughputThrottler(time, recordsPerSec) + recordCount = 0 + } + } + + if (isLeader) { + recordCount += 1 + + val startTimeMs = time.milliseconds() + val sendTimeMs = if (throttler.maybeThrottle(recordCount, startTimeMs)) { + time.milliseconds() + } else { + startTimeMs + } + + val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, Collections.singletonList(payload)) + if (offset == null || offset == Long.MaxValue) { Review comment: I will try to document this better, but `Long.MaxValue` is how we decided to handle the case where the epoch in `scheduleAppend` does not match the current epoch. This can happen because the raft epoch is updated asynchronously and there is no way to ensure the state machine has seen the latest value. The expectation is that the state machine will update its uncommitted state with an offset which is impossible to become committed. After it observes the epoch change, this uncommitted state will be discarded. Note that although I added the explicit check here, it is not technically necessary. Let me consider removing it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org