junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1526788841


##########
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##########
@@ -551,9 +580,11 @@ class BrokerLifecycleManager(
   }
 
   private def scheduleNextCommunication(intervalNs: Long): Unit = {
-    trace(s"Scheduling next communication at 
${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " +
+    val nanos = if (nextSchedulingShouldBeImmediate) 0 else intervalNs

Review Comment:
   nanos =>  adjustedIntervalNs?



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
       Collections.emptyMap(), OptionalLong.empty())
     poll(ctx, manager, registration)
 
+    def nextHeartbeatDirs(): Set[String] =
+      poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
+        .data().offlineLogDirs().asScala.map(_.toString).toSet
+    assertEquals(Set.empty, nextHeartbeatDirs())
     
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
     
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())

Review Comment:
   It seems this could still be flaky. 
   
   `poll(ctx, manager, registration)
   `
   A CommunicationEvent c1 with a delay of 0 is scheduled in manager.eventQueue.
   
   `assertEquals(Set.empty, nextHeartbeatDirs())
   `
   c1 is processed in `ctx.mockChannelManager` and a 
BrokerHeartbeatResponseEvent b1 is added to `manager.eventQueue`. b1 is 
processed at `manager.eventQueue` and a CommunicationEvent c2 with a delay of 
100 is scheduled in `manager.eventQueue`.
   
   `manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
   `
   An OfflineDirEvent o1 is appended to `manager.eventQueue`, but not yet 
processed.
   
   `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
   `
   nextHeartbeatDirs() causes the time to advance quickly and passes 100. c2 is 
processed at `manager.eventQueue` and a HeartBeat request h1 is added to 
`ctx.mockChannelManager`.
   
   o1 is processed at `manager.eventQueue` and a CommunicationEvent c3 is 
appended to `manager.eventQueue`.  c3 is processed at `manager.eventQueue` sets 
`nextSchedulingShouldBeImmediate` to true.
   
   h1 is processed by `ctx.mockChannelManager` and adds 
BrokerHeartbeatResponseEvent b2 to `manager.eventQueue`. `manager.eventQueue` 
processes b2 and schedules a CommunicationEvent c4 with a delay of 0 in 
`manager.eventQueue`. c4 is processed at `manager.eventQueue` and a HeartBeat 
request h2 (doesn't pick up ej8Q9_d2Ri6FXNiTxKFiow) is added to 
`ctx.mockChannelManager`.
   
   
`manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))`
   `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())`
   Now the above assertion will fail since `nextHeartbeatDirs()` will pick up 
c4 which doesn't include ej8Q9_d2Ri6FXNiTxKFiow.
   
   
   
   



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -513,4 +514,36 @@ public void close() throws InterruptedException {
         eventHandlerThread.join();
         log.info("closed event queue.");
     }
+
+    /**
+     * Returns the deferred event that the queue is waiting for, idling until
+     * its deadline comes, if there is any.
+     * If the queue has immediate work to do, this returns empty.
+     * This is useful for unit tests, where to make progress, we need to
+     * speed the clock up until the next scheduled event is ready to run.
+     */
+    public Optional<Event> scheduledAfterIdling() {

Review Comment:
   scheduledAfterIdling => firstDeferredIfIdling ?



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-    val context = new RegistrationTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
-    context.controllerNodeProvider.node.set(controllerNode)
-    manager.start(() => context.highestMetadataOffset.get(),
-      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.of(10L))
-    TestUtils.retry(60000) {
-      assertEquals(1, context.mockChannelManager.unsentQueue.size)
-      assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-    }
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-    TestUtils.retry(10000) {
-      context.poll()
-      assertEquals(1000L, manager.brokerEpoch)
-    }
 
+    def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+    def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))

Review Comment:
   nextRequest => nextHeartbeatRequest ?



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-    val context = new RegistrationTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
-    context.controllerNodeProvider.node.set(controllerNode)
-    manager.start(() => context.highestMetadataOffset.get(),
-      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.of(10L))
-    TestUtils.retry(60000) {
-      assertEquals(1, context.mockChannelManager.unsentQueue.size)
-      assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-    }
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-    TestUtils.retry(10000) {
-      context.poll()
-      assertEquals(1000L, manager.brokerEpoch)
-    }
 
+    def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+    def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    def nextRegistrationRequest(epoch: Long) =
+      doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+    // Broker registers and response sets epoch to 1000L
+    assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+    nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+    assertEquals(1000L, manager.brokerEpoch)
+
+    // Trigger JBOD MV update
     manager.handleKraftJBODMetadataVersionUpdate()
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-    TestUtils.retry(60000) {
-      context.time.sleep(100)
-      context.poll()
-      manager.eventQueue.wakeup()
-      assertEquals(1200, manager.brokerEpoch)
-    }
+
+    // We may have to accept some heartbeats before the new registration is 
sent
+    while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   Hmm, the next request could be `BrokerRegistration`. In that case, it would 
be incorrect to mock a `BrokerRegistrationResponse`, right?



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
     result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-    while (!future.isDone || context.mockClient.hasInFlightRequests) {
-      context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+    while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+      // If the manager is idling until scheduled events we need to advance 
the clock
+      if (manager.eventQueue.scheduledAfterIdling()
+        .filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   I couldn't find a `TimeoutEvent` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to