m1a2st commented on code in PR #20335: URL: https://github.com/apache/kafka/pull/20335#discussion_r2267134218
########## core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala: ########## @@ -119,28 +118,25 @@ class AbstractPartitionTest { isLeader: Boolean): Partition = { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val replicas = java.util.List.of[Integer](brokerId, remoteReplicaId) + val replicas = Array(brokerId, remoteReplicaId) val isr = replicas + val partitionRegistrationBuilder = new PartitionRegistration.Builder() + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) if (isLeader) { - assertTrue(partition.makeLeader(new PartitionState() - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") + val partitionRegistration = partitionRegistrationBuilder.setLeader(brokerId).build() + assertTrue(partition.makeLeader(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become leader transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) } else { - assertTrue(partition.makeFollower(new PartitionState() - .setLeader(remoteReplicaId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(1) - .setReplicas(replicas) - .setIsNew(true), offsetCheckpoints, None), "Expected become follower transition to succeed") + val partitionRegistration = partitionRegistrationBuilder.setLeader(remoteReplicaId).build() + assertTrue(partition.makeFollower(partitionRegistration, isNew = true, offsetCheckpoints, None), "Expected become follower transition to succeed") assertEquals(leaderEpoch, partition.getLeaderEpoch) - assertEquals(None, partition.leaderLogIfLocal) + assert(partition.leaderLogIfLocal.isEmpty) Review Comment: ```suggestion assertTrue(partition.leaderLogIfLocal.isEmpty) ``` ########## core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala: ########## @@ -86,31 +86,35 @@ class AssignmentStateTest extends AbstractPartitionTest { @ParameterizedTest @MethodSource(Array("parameters")) - def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], - adding: util.List[Integer], removing: util.List[Integer], + def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int], + adding: Array[Int], removing: Array[Int], original: util.List[Int], isUnderReplicated: Boolean): Unit = { - val leaderState = new PartitionState() + val partitionRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(6) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - if (!adding.isEmpty) - leaderState.setAddingReplicas(adding) - if (!removing.isEmpty) - leaderState.setRemovingReplicas(removing) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + if (adding.nonEmpty) + partitionRegistrationBuilder.setAddingReplicas(adding) +// leaderState.setAddingReplicas(adding) + if (removing.nonEmpty) + partitionRegistrationBuilder.setRemovingReplicas(removing) +// leaderState.setRemovingReplicas(removing) Review Comment: Could you clarify whether this commented-out code is still needed? If not, it should be removed to keep the codebase clean. ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition, * replica manager that state is already correct and the become-follower steps can * be skipped. */ - def makeFollower(partitionState: JPartitionState, + def makeFollower(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = None): Boolean = { inWriteLock(leaderIsrUpdateLock) { - if (partitionState.partitionEpoch < partitionEpoch) { + if (partitionRegistration.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the follower is already at a newer partition epoch $partitionEpoch.") + s", partition registration $partitionRegistration and isNew=$isNew since the follower is already at a newer partition epoch $partitionEpoch.") return false } - val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch + val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch // The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet // the under min isr condition during the makeFollower process and emits the wrong metric. - leaderReplicaIdOpt = Option(partitionState.leader) - leaderEpoch = partitionState.leaderEpoch + leaderReplicaIdOpt = Option(partitionRegistration.leader) + leaderEpoch = partitionRegistration.leaderEpoch leaderEpochStartOffsetOpt = None - partitionEpoch = partitionState.partitionEpoch + partitionEpoch = partitionRegistration.partitionEpoch updateAssignmentAndIsr( - replicas = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq, + replicas = partitionRegistration.replicas, isLeader = false, isr = Set.empty, - addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), - removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt), - LeaderRecoveryState.of(partitionState.leaderRecoveryState) + addingReplicas = partitionRegistration.addingReplicas, + removingReplicas = partitionRegistration.removingReplicas, + partitionRegistration.leaderRecoveryState ) - createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, targetLogDirectoryId) val followerLog = localLogOrException if (isNewLeaderEpoch) { val leaderEpochEndOffset = followerLog.logEndOffset - stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " + - s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " + - s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " + + stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionRegistration.leaderEpoch} from " + + s"offset $leaderEpochEndOffset with partition epoch ${partitionRegistration.partitionEpoch} and " + + s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionRegistration.leader}. " + s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.") } else { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + Review Comment: ```suggestion stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " + s"partition registration $partitionRegistration and isNew=$isNew since it is already a follower with leader epoch $leaderEpoch.") ``` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -800,23 +800,23 @@ class Partition(val topicPartition: TopicPartition, currentTimeMs, leaderEpochStartOffset, isNewLeader, - partitionState.isr.contains(replica.brokerId) + isr.contains(replica.brokerId) ) } // We update the leader epoch and the leader epoch start offset iff the // leader epoch changed. - leaderEpoch = partitionState.leaderEpoch + leaderEpoch = partitionRegistration.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) } else { stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since it is already the leader with leader epoch $leaderEpoch. " + + s"and partition state $partitionRegistration since it is already the leader with leader epoch $leaderEpoch. " + Review Comment: I think we also need to log `isNew`. ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition, * replica manager that state is already correct and the become-follower steps can * be skipped. */ - def makeFollower(partitionState: JPartitionState, + def makeFollower(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = None): Boolean = { inWriteLock(leaderIsrUpdateLock) { - if (partitionState.partitionEpoch < partitionEpoch) { + if (partitionRegistration.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + Review Comment: ```suggestion stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId, " + s"partition registration $partitionRegistration and isNew=$isNew since the follower is already at a newer partition epoch $partitionEpoch.") ``` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -731,31 +730,32 @@ class Partition(val topicPartition: TopicPartition, * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(partitionState: JPartitionState, + def makeLeader(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetDirectoryId: Option[Uuid] = None): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { // Partition state changes are expected to have a partition epoch larger or equal // to the current partition epoch. The latter is allowed because the partition epoch // is also updated by the AlterPartition response so the new epoch might be known - // before a LeaderAndIsr request is received or before an update is received via + // before a partitionRegistration is received or before an update is received via // the metadata log. - if (partitionState.partitionEpoch < partitionEpoch) { + if (partitionRegistration.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the leader is already at a newer partition epoch $partitionEpoch.") + s"and partition state $partitionRegistration since the leader is already at a newer partition epoch $partitionEpoch.") Review Comment: Should change to `and partition registeration` and log `isNew`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org