m1a2st commented on code in PR #20335:
URL: https://github.com/apache/kafka/pull/20335#discussion_r2267134218


##########
core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala:
##########
@@ -119,28 +118,25 @@ class AbstractPartitionTest {
                                         isLeader: Boolean): Partition = {
     partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
 
-    val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId)
+    val replicas = Array(brokerId, remoteReplicaId)
     val isr = replicas
 
+    val partitionRegistrationBuilder = new PartitionRegistration.Builder()
+      .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(isr)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas)
+      .setDirectories(DirectoryId.unassignedArray(replicas.length))
     if (isLeader) {
-      assertTrue(partition.makeLeader(new PartitionState()
-        .setLeader(brokerId)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(isr)
-        .setPartitionEpoch(1)
-        .setReplicas(replicas)
-        .setIsNew(true), offsetCheckpoints, None), "Expected become leader 
transition to succeed")
+      val partitionRegistration = 
partitionRegistrationBuilder.setLeader(brokerId).build()
+      assertTrue(partition.makeLeader(partitionRegistration, isNew = true, 
offsetCheckpoints, None), "Expected become leader transition to succeed")
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
     } else {
-      assertTrue(partition.makeFollower(new PartitionState()
-        .setLeader(remoteReplicaId)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(isr)
-        .setPartitionEpoch(1)
-        .setReplicas(replicas)
-        .setIsNew(true), offsetCheckpoints, None), "Expected become follower 
transition to succeed")
+      val partitionRegistration = 
partitionRegistrationBuilder.setLeader(remoteReplicaId).build()
+      assertTrue(partition.makeFollower(partitionRegistration, isNew = true, 
offsetCheckpoints, None), "Expected become follower transition to succeed")
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
-      assertEquals(None, partition.leaderLogIfLocal)
+      assert(partition.leaderLogIfLocal.isEmpty)

Review Comment:
   ```suggestion
         assertTrue(partition.leaderLogIfLocal.isEmpty)
   ```



##########
core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala:
##########
@@ -86,31 +86,35 @@ class AssignmentStateTest extends AbstractPartitionTest {
 
   @ParameterizedTest
   @MethodSource(Array("parameters"))
-  def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: 
util.List[Integer],
-                                    adding: util.List[Integer], removing: 
util.List[Integer],
+  def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int],
+                                    adding: Array[Int], removing: Array[Int],
                                     original: util.List[Int], 
isUnderReplicated: Boolean): Unit = {
-    val leaderState = new PartitionState()
+    val partitionRegistrationBuilder = new PartitionRegistration.Builder()
       .setLeader(brokerId)
+      .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
       .setLeaderEpoch(6)
       .setIsr(isr)
       .setPartitionEpoch(1)
       .setReplicas(replicas)
-      .setIsNew(false)
-    if (!adding.isEmpty)
-      leaderState.setAddingReplicas(adding)
-    if (!removing.isEmpty)
-      leaderState.setRemovingReplicas(removing)
+      .setDirectories(DirectoryId.unassignedArray(replicas.length))
+    if (adding.nonEmpty)
+      partitionRegistrationBuilder.setAddingReplicas(adding)
+//      leaderState.setAddingReplicas(adding)
+    if (removing.nonEmpty)
+      partitionRegistrationBuilder.setRemovingReplicas(removing)
+//      leaderState.setRemovingReplicas(removing)

Review Comment:
   Could you clarify whether this commented-out code is still needed? If not, 
it should be removed to keep the codebase clean.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition,
    * replica manager that state is already correct and the become-follower 
steps can
    * be skipped.
    */
-  def makeFollower(partitionState: JPartitionState,
+  def makeFollower(partitionRegistration: PartitionRegistration,
+                   isNew: Boolean,
                    highWatermarkCheckpoints: OffsetCheckpoints,
                    topicId: Option[Uuid],
                    targetLogDirectoryId: Option[Uuid] = None): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      if (partitionState.partitionEpoch < partitionEpoch) {
+      if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since the follower is already 
at a newer partition epoch $partitionEpoch.")
+          s", partition registration $partitionRegistration and isNew=$isNew 
since the follower is already at a newer partition epoch $partitionEpoch.")
         return false
       }
 
-      val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
+      val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
       // The leader should be updated before updateAssignmentAndIsr where we 
clear the ISR. Or it is possible to meet
       // the under min isr condition during the makeFollower process and emits 
the wrong metric.
-      leaderReplicaIdOpt = Option(partitionState.leader)
-      leaderEpoch = partitionState.leaderEpoch
+      leaderReplicaIdOpt = Option(partitionRegistration.leader)
+      leaderEpoch = partitionRegistration.leaderEpoch
       leaderEpochStartOffsetOpt = None
-      partitionEpoch = partitionState.partitionEpoch
+      partitionEpoch = partitionRegistration.partitionEpoch
 
       updateAssignmentAndIsr(
-        replicas = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq,
+        replicas = partitionRegistration.replicas,
         isLeader = false,
         isr = Set.empty,
-        addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
-        removingReplicas = 
partitionState.removingReplicas.asScala.map(_.toInt),
-        LeaderRecoveryState.of(partitionState.leaderRecoveryState)
+        addingReplicas = partitionRegistration.addingReplicas,
+        removingReplicas = partitionRegistration.removingReplicas,
+        partitionRegistration.leaderRecoveryState
       )
 
-      createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, 
topicId, targetLogDirectoryId)
+      createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
 
       val followerLog = localLogOrException
       if (isNewLeaderEpoch) {
         val leaderEpochEndOffset = followerLog.logEndOffset
-        stateChangeLogger.info(s"Follower $topicPartition starts at leader 
epoch ${partitionState.leaderEpoch} from " +
-          s"offset $leaderEpochEndOffset with partition epoch 
${partitionState.partitionEpoch} and " +
-          s"high watermark ${followerLog.highWatermark}. Current leader is 
${partitionState.leader}. " +
+        stateChangeLogger.info(s"Follower $topicPartition starts at leader 
epoch ${partitionRegistration.leaderEpoch} from " +
+          s"offset $leaderEpochEndOffset with partition epoch 
${partitionRegistration.partitionEpoch} and " +
+          s"high watermark ${followerLog.highWatermark}. Current leader is 
${partitionRegistration.leader}. " +
           s"Previous leader $leaderReplicaIdOpt and previous leader epoch was 
$leaderEpoch.")
       } else {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +

Review Comment:
   ```suggestion
           stateChangeLogger.info(s"Skipped the become-follower state change 
for $topicPartition with topic id $topicId, " +
            s"partition registration $partitionRegistration and isNew=$isNew 
since it is already a follower with leader epoch $leaderEpoch.")
   ```



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -800,23 +800,23 @@ class Partition(val topicPartition: TopicPartition,
             currentTimeMs,
             leaderEpochStartOffset,
             isNewLeader,
-            partitionState.isr.contains(replica.brokerId)
+            isr.contains(replica.brokerId)
           )
         }
 
         // We update the leader epoch and the leader epoch start offset iff the
         // leader epoch changed.
-        leaderEpoch = partitionState.leaderEpoch
+        leaderEpoch = partitionRegistration.leaderEpoch
         leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
       } else {
         stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since it is already the leader 
with leader epoch $leaderEpoch. " +
+          s"and partition state $partitionRegistration since it is already the 
leader with leader epoch $leaderEpoch. " +

Review Comment:
   I think we also need to log `isNew`.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition,
    * replica manager that state is already correct and the become-follower 
steps can
    * be skipped.
    */
-  def makeFollower(partitionState: JPartitionState,
+  def makeFollower(partitionRegistration: PartitionRegistration,
+                   isNew: Boolean,
                    highWatermarkCheckpoints: OffsetCheckpoints,
                    topicId: Option[Uuid],
                    targetLogDirectoryId: Option[Uuid] = None): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      if (partitionState.partitionEpoch < partitionEpoch) {
+      if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +

Review Comment:
   ```suggestion
           stateChangeLogger.info(s"Skipped the become-follower state change 
for $topicPartition with topic id $topicId, " +
           s"partition registration $partitionRegistration and isNew=$isNew 
since the follower is already at a newer partition epoch $partitionEpoch.")
   ```



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -731,31 +730,32 @@ class Partition(val topicPartition: TopicPartition,
    * from the time when this broker was the leader last time) and setting the 
new leader and ISR.
    * If the leader replica id does not change, return false to indicate the 
replica manager.
    */
-  def makeLeader(partitionState: JPartitionState,
+  def makeLeader(partitionRegistration: PartitionRegistration,
+                 isNew: Boolean,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid],
                  targetDirectoryId: Option[Uuid] = None): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       // Partition state changes are expected to have a partition epoch larger 
or equal
       // to the current partition epoch. The latter is allowed because the 
partition epoch
       // is also updated by the AlterPartition response so the new epoch might 
be known
-      // before a LeaderAndIsr request is received or before an update is 
received via
+      // before a partitionRegistration is received or before an update is 
received via
       // the metadata log.
-      if (partitionState.partitionEpoch < partitionEpoch) {
+      if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since the leader is already at 
a newer partition epoch $partitionEpoch.")
+          s"and partition state $partitionRegistration since the leader is 
already at a newer partition epoch $partitionEpoch.")

Review Comment:
   Should change to `and partition registeration` and log `isNew`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to