dajac commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r412807377



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest)
+    val request = buildRequest(updateMetadataRequest)
+
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      Seq()
+    )
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleUpdateMetadataRequest(request)
+    val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, 
updateMetadataRequest, capturedResponse)
+      .asInstanceOf[UpdateMetadataResponse]
+    assertEquals(expectedError, updateMetadataResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+    val controllerId = 2
+    val controllerEpoch = 6
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+    val partitionStates = Seq(
+      new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+        .setTopicName("topicW")
+        .setPartitionIndex(1)
+        .setControllerEpoch(1)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(asList(0, 1))
+        .setZkVersion(2)
+        .setReplicas(asList(0, 1, 2))
+        .setIsNew(false)
+    ).asJava
+    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+      ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      partitionStates,
+      asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+    ).build()
+    val request = buildRequest(leaderAndIsrRequest)
+    val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setPartitionErrors(asList()))
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject(),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      response
+    )
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleLeaderAndIsrRequest(request)
+    val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, 
leaderAndIsrRequest, capturedResponse)
+      .asInstanceOf[LeaderAndIsrResponse]
+    assertEquals(expectedError, leaderAndIsrResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)

Review comment:
       This and the two above should use `testStopReplicaRequest` instead of 
`testUpdateMetadataRequest`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest)
+    val request = buildRequest(updateMetadataRequest)
+
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      Seq()
+    )
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleUpdateMetadataRequest(request)
+    val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, 
updateMetadataRequest, capturedResponse)
+      .asInstanceOf[UpdateMetadataResponse]
+    assertEquals(expectedError, updateMetadataResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+    val controllerId = 2
+    val controllerEpoch = 6
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+    val partitionStates = Seq(
+      new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+        .setTopicName("topicW")
+        .setPartitionIndex(1)
+        .setControllerEpoch(1)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(asList(0, 1))
+        .setZkVersion(2)
+        .setReplicas(asList(0, 1, 2))
+        .setIsNew(false)
+    ).asJava
+    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+      ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      partitionStates,
+      asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+    ).build()
+    val request = buildRequest(leaderAndIsrRequest)
+    val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setPartitionErrors(asList()))
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject(),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      response
+    )
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleLeaderAndIsrRequest(request)
+    val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, 
leaderAndIsrRequest, capturedResponse)
+      .asInstanceOf[LeaderAndIsrResponse]
+    assertEquals(expectedError, leaderAndIsrResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+    val controllerId = 0
+    val controllerEpoch = 5
+    val brokerEpoch = 230498320L
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+    val fooPartition = new TopicPartition("foo", 0)
+    val topicStates = Seq(
+      new StopReplicaTopicState()
+        .setTopicName(fooPartition.topic())
+        .setPartitionStates(Seq(new StopReplicaPartitionState()
+          .setPartitionIndex(fooPartition.partition())
+          .setLeaderEpoch(1)
+          .setDeletePartition(false)).asJava)
+    ).asJava
+    val stopReplicaRequest = new StopReplicaRequest.Builder(
+      ApiKeys.STOP_REPLICA.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      false,
+      topicStates
+    ).build()
+    val request = buildRequest(stopReplicaRequest)
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.stopReplicas(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.eq(controllerId),
+      EasyMock.eq(controllerEpoch),
+      EasyMock.eq(brokerEpoch + 1),

Review comment:
       This should rely on `brokerEpochInRequest`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest)
+    val request = buildRequest(updateMetadataRequest)
+
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      Seq()
+    )
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleUpdateMetadataRequest(request)
+    val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, 
updateMetadataRequest, capturedResponse)
+      .asInstanceOf[UpdateMetadataResponse]
+    assertEquals(expectedError, updateMetadataResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+    val controllerId = 2
+    val controllerEpoch = 6
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+    val partitionStates = Seq(
+      new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+        .setTopicName("topicW")
+        .setPartitionIndex(1)
+        .setControllerEpoch(1)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(asList(0, 1))
+        .setZkVersion(2)
+        .setReplicas(asList(0, 1, 2))
+        .setIsNew(false)
+    ).asJava
+    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+      ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      partitionStates,
+      asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+    ).build()
+    val request = buildRequest(leaderAndIsrRequest)
+    val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setPartitionErrors(asList()))
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject(),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      response
+    )
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleLeaderAndIsrRequest(request)
+    val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, 
leaderAndIsrRequest, capturedResponse)
+      .asInstanceOf[LeaderAndIsrResponse]
+    assertEquals(expectedError, leaderAndIsrResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+    val controllerId = 0
+    val controllerEpoch = 5
+    val brokerEpoch = 230498320L

Review comment:
       `brokerEpoch` is not necessary, I suppose.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to