hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509711302



##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends 
Logging {
     )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends 
ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+    client: KafkaRaftClient[Array[Byte]],
+    time: Time,
+    brokerId: Int,
+    recordsPerSec: Int,
+    recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with 
RaftClient.Listener[Array[Byte]] {
+
+    private val stats = new WriteStats(time, printIntervalMs = 5000)
+    private val payload = new Array[Byte](recordSize)
+    private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+    private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+    private var isLeader = false
+    private var throttler: ThroughputThrottler = _
+    private var recordCount = 0
+
+    override def doWork(): Unit = {
+      if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+        latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+        isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+        if (isLeader) {
+          pendingAppends.clear()

Review comment:
       You are right that the appends may still be committed, but in this 
patch, the `handleCommit` API is only invoked for appends within the current 
epoch. I thought it seemed simpler for now to just reset state at the start of 
the epoch. We can be more clever in the future once `handleCommit` is extended 
to account for replication.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to