hachikuji commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r679344612



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1392,11 +1393,24 @@ class ReplicaManager(val config: KafkaConfig,
                   s"leader epoch $currentLeaderEpoch")
                 responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
               } else {
-                stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-                  s"controller $controllerId with correlation id 
$correlationId " +
-                  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-                  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
-                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+                // The controller may send LeaderAndIsr to upgrade to using 
topic IDs without bumping the epoch.

Review comment:
       nit: does it makes sense to move this comment into the first `case`? 

##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -1115,6 +1115,63 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     assertEquals(topicIdAfterUpgrade.get, topicId)
     assertEquals("t", controller2.controllerContext.topicNames(topicId))
 
+    TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
+
+    val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+    assertEquals(Some(topicId), topicIdInLog)
+
+    adminZkClient.deleteTopic(tp.topic)
+    TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+      "topic should have been removed from controller context after deletion")
+  }
+
+  @Test
+  def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+    // Simulate an upgrade scenario where the controller is still on a 
pre-topic ID IBP, but the other two brokers are upgraded.
+    servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+    servers = servers ++ makeServers(3, startingIdNumber = 1)
+    val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)
+    assertEquals(0, originalControllerId)
+    val controller = getController().kafkaController
+    val remainingBrokers = servers.filter(_.config.brokerId != 
originalControllerId)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> servers.map(_.config.brokerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+    waitForPartitionState(tp, firstControllerEpoch, originalControllerId, 
LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+    val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    assertEquals(None, topicIdAfterCreate)
+    val emptyTopicId = controller.controllerContext.topicIds.get("t")
+    assertEquals(None, emptyTopicId)
+
+    // All logs should not have topic IDs
+    servers.foreach({ server =>

Review comment:
       nit: usual pattern is
   ```scala
   servers.foreach { server =>
   ```
   More of these in here.

##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -1115,6 +1115,63 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     assertEquals(topicIdAfterUpgrade.get, topicId)
     assertEquals("t", controller2.controllerContext.topicNames(topicId))
 
+    TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
+
+    val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+    assertEquals(Some(topicId), topicIdInLog)
+
+    adminZkClient.deleteTopic(tp.topic)
+    TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+      "topic should have been removed from controller context after deletion")
+  }
+
+  @Test
+  def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+    // Simulate an upgrade scenario where the controller is still on a 
pre-topic ID IBP, but the other two brokers are upgraded.
+    servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+    servers = servers ++ makeServers(3, startingIdNumber = 1)
+    val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)
+    assertEquals(0, originalControllerId)
+    val controller = getController().kafkaController
+    val remainingBrokers = servers.filter(_.config.brokerId != 
originalControllerId)
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> servers.map(_.config.brokerId))

Review comment:
       Do we want to use `remainingBrokers` so that the controller is not part 
of the assignment?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1392,11 +1393,24 @@ class ReplicaManager(val config: KafkaConfig,
                   s"leader epoch $currentLeaderEpoch")
                 responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
               } else {
-                stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
-                  s"controller $controllerId with correlation id 
$correlationId " +
-                  s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-                  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
-                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
+                // The controller may send LeaderAndIsr to upgrade to using 
topic IDs without bumping the epoch.
+                val error = requestTopicId match {
+                  case Some(topicId) if logTopicId.isEmpty =>
+                    // If we have a matching epoch, we expect the log to be 
defined.
+                    val log = localLogOrException(partition.topicPartition)
+                    log.assignTopicId(topicId)
+                    stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
+                      s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation" +
+                      s" id $correlationId epoch $controllerEpoch")

Review comment:
       nit: maybe we can be consistent and put the space at the end of the 
previous line?

##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -1115,6 +1115,63 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     assertEquals(topicIdAfterUpgrade.get, topicId)
     assertEquals("t", controller2.controllerContext.topicNames(topicId))
 
+    TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
+
+    val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
+    assertEquals(Some(topicId), topicIdInLog)
+
+    adminZkClient.deleteTopic(tp.topic)
+    TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),
+      "topic should have been removed from controller context after deletion")
+  }
+
+  @Test
+  def testTopicIdCreatedOnUpgradeMultiBrokerScenario(): Unit = {
+    // Simulate an upgrade scenario where the controller is still on a 
pre-topic ID IBP, but the other two brokers are upgraded.
+    servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+    servers = servers ++ makeServers(3, startingIdNumber = 1)
+    val originalControllerId = TestUtils.waitUntilControllerElected(zkClient)

Review comment:
       Is it worth asserting here that the controller here is the one with IBP 
2.7?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to