dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714923995



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+                                                          tp: TopicPartition,
+                                                          expectedTopicId: 
Option[Uuid]): Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+    val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+    val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
       Ah.. It is because `topicId` is also used in the statement. You could 
indeed use `this.topicId` there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to