chia7712 commented on code in PR #20014:
URL: https://github.com/apache/kafka/pull/20014#discussion_r2162028124


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1238,52 +1209,45 @@ class ReplicaManagerTest {
    */
   private def 
verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps: 
Properties,

Review Comment:
   Could you please inline this method?
   ```scala
   
     /**
      * If a partition becomes a follower and the leader is unchanged it should 
check for truncation
      * if the epoch has increased by more than one (which suggests it has 
missed an update). For
      * IBP version 2.7 onwards, we don't require this since we can truncate at 
any time based
      * on diverging epochs returned in fetch responses.
      */
     @Test
     def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = 
{
       val followerBrokerId = 0
       val leaderBrokerId = 1
       var leaderEpoch = 1
       val leaderEpochIncrement = 2
       val countDownLatch = new CountDownLatch(1)
       val offsetFromLeader = 5
       // Prepare the mocked components for the test
       val (replicaManager, mockLogMgr) = 
prepareReplicaManagerAndLogManager(new MockTimer(time),
         0, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch,
         expectTruncation = true, localLogOffset = Optional.of(10), 
offsetFromLeader = offsetFromLeader, topicId = Optional.of(topicId))
   
       try {
         // Initialize partition state to follower, with leader = 1, 
leaderEpoch = 1
         val partition = replicaManager.createPartition(topicPartition)
         val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
         partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
         val followerDelta = topicsCreateDelta(startId = followerBrokerId, 
isStartIdLeader = false, partitions = List(0), List.empty, topic, 
topicIds(topic), leaderEpoch)
         replicaManager.applyDelta(followerDelta, 
imageFromTopics(followerDelta.apply()))
   
         // Verify log created and partition is hosted
         val localLog = replicaManager.localLog(topicPartition)
         assertTrue(localLog.isDefined, "Log should be created for follower 
after applyDelta")
         val hostedPartition = replicaManager.getPartition(topicPartition)
         assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
   
         // Make local partition a follower - because epoch increased by more 
than 1, truncation should
         // trigger even though leader does not change
         leaderEpoch += leaderEpochIncrement
         val epochJumpDelta = topicsCreateDelta(startId = followerBrokerId, 
isStartIdLeader = false, partitions = List(0), List.empty, topic, 
topicIds(topic), leaderEpoch)
         replicaManager.applyDelta(epochJumpDelta, 
imageFromTopics(epochJumpDelta.apply()))
   
         assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
   
         // Truncation should have happened once
         verify(mockLogMgr).truncateTo(Map(topicPartition -> offsetFromLeader), 
isFuture = false)
   
         
verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), 
any())
       } finally {
         replicaManager.shutdown(checkpointHW = false)
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to