hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509620795



##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends 
Logging {
     )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends 
ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+    client: KafkaRaftClient[Array[Byte]],
+    time: Time,
+    brokerId: Int,
+    recordsPerSec: Int,
+    recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with 
RaftClient.Listener[Array[Byte]] {
+
+    private val stats = new WriteStats(time, printIntervalMs = 5000)
+    private val payload = new Array[Byte](recordSize)
+    private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+    private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+    private var isLeader = false
+    private var throttler: ThroughputThrottler = _
+    private var recordCount = 0
+
+    override def doWork(): Unit = {
+      if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+        latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+        isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+        if (isLeader) {
+          pendingAppends.clear()
+          throttler = new ThroughputThrottler(time, recordsPerSec)
+          recordCount = 0
+        }
+      }
+
+      if (isLeader) {
+        recordCount += 1
+
+        val startTimeMs = time.milliseconds()
+        val sendTimeMs = if (throttler.maybeThrottle(recordCount, 
startTimeMs)) {
+          time.milliseconds()
+        } else {
+          startTimeMs
+        }
+
+        val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, 
Collections.singletonList(payload))
+        if (offset == null || offset == Long.MaxValue) {

Review comment:
       I will try to document this better, but `Long.MaxValue` is how we 
decided to handle the case where the epoch in `scheduleAppend` does not match 
the current epoch. This can happen because the raft epoch is updated 
asynchronously and there is no way to ensure the state machine has seen the 
latest value. The expectation is that the state machine will update its 
uncommitted state with an offset which is impossible to become committed. After 
it observes the epoch change, this uncommitted state will be discarded.
   
   Note that although I added the explicit check here, it is not technically 
necessary. Let me consider removing it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to