hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509711302
########## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ########## @@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging { ) } - class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") { + class RaftWorkloadGenerator( + client: KafkaRaftClient[Array[Byte]], + time: Time, + brokerId: Int, + recordsPerSec: Int, + recordSize: Int + ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] { + + private val stats = new WriteStats(time, printIntervalMs = 5000) + private val payload = new Array[Byte](recordSize) + private val pendingAppends = new util.ArrayDeque[PendingAppend]() + + private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0) + private var isLeader = false + private var throttler: ThroughputThrottler = _ + private var recordCount = 0 + + override def doWork(): Unit = { + if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) { + latestLeaderAndEpoch = client.currentLeaderAndEpoch() + isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId + if (isLeader) { + pendingAppends.clear() Review comment: You are right that the appends may still be committed, but in this patch, the `handleCommit` API is only invoked for appends within the current epoch. I thought it seemed simpler for now to just reset state at the start of the epoch. We can be more clever in the future once `handleCommit` is extended to account for replication. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org