soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683014
##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
@Test
def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
- val context = new RegistrationTestContext(configProperties)
- val manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-", isZkBroker = false,
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+ val ctx = new RegistrationTestContext(configProperties)
+ val manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"jbod-metadata-version-update", isZkBroker = false,
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
- context.controllerNodeProvider.node.set(controllerNode)
- manager.start(() => context.highestMetadataOffset.get(),
- context.mockChannelManager, context.clusterId,
context.advertisedListeners,
+ ctx.controllerNodeProvider.node.set(controllerNode)
+
+ manager.start(() => ctx.highestMetadataOffset.get(),
+ ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
Collections.emptyMap(), OptionalLong.of(10L))
- TestUtils.retry(60000) {
- assertEquals(1, context.mockChannelManager.unsentQueue.size)
- assertEquals(10L,
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
- }
- context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
- new BrokerRegistrationResponseData().setBrokerEpoch(1000)),
controllerNode)
- TestUtils.retry(10000) {
- context.poll()
- assertEquals(1000L, manager.brokerEpoch)
- }
+ def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx,
manager, prepareResponse[T](ctx, response))
+ def nextRequest() = doPoll[AbstractRequest](new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+ def nextRegistrationRequest(epoch: Long) =
+ doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+ // Broker registers and response sets epoch to 1000L
+ assertEquals(10L,
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+ nextRequest() // poll for next request as way to synchronize with the new
value into brokerEpoch
+ assertEquals(1000L, manager.brokerEpoch)
+
+ // Trigger JBOD MV update
manager.handleKraftJBODMetadataVersionUpdate()
- context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
- new BrokerRegistrationResponseData().setBrokerEpoch(1200)),
controllerNode)
- TestUtils.retry(60000) {
- context.time.sleep(100)
- context.poll()
- manager.eventQueue.wakeup()
- assertEquals(1200, manager.brokerEpoch)
- }
+
+ // We may have to accept some heartbeats before the new registration is
sent
+ while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()
Review Comment:
No, I don't think so. `prepareResponse` delegates to `MockClient` which
expects a predetermined request-response order. It supports a `RequestMatcher`
which `prepareResponse` uses to extract the request, but it does not support
preparing a conditional response. We need a larger change to add support in
`MockClient` for conditional prepared responses.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]