soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1534216092
########## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ########## @@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { - val context = new RegistrationTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) + val ctx = new RegistrationTestContext(configProperties) + val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) - context.controllerNodeProvider.node.set(controllerNode) - manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, + ctx.controllerNodeProvider.node.set(controllerNode) + + manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) - TestUtils.retry(60000) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) - } - context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) - TestUtils.retry(10000) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) - } + def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) + def nextHeartbeatRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) + def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + + // Broker registers and response sets epoch to 1000L + assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + + nextHeartbeatRequest() // poll for next request as way to synchronize with the new value into brokerEpoch + assertEquals(1000L, manager.brokerEpoch) + + // Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() - context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) - TestUtils.retry(60000) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) - } + + // Depending on scheduling, the next request could either be BrokerRegistration or BrokerHeartbeat. Review Comment: You're right. This does not seem necessary. I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org