hachikuji commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r490464756
########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -157,6 +158,44 @@ case class OngoingReassignmentState(addingReplicas: Seq[Int], case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState + + +sealed trait IsrState { + /** + * Includes only the in-sync replicas which have been committed to ZK. + */ + def isr: Set[Int] + + /** + * This set may include un-committed ISR members following an expansion. This "effective" ISR is used for advancing + * the high watermark as well as determining which replicas are required for acks=all produce requests. + * + * Only applicable as of IBP 2.7-IV2, for older versions this will return the committed ISR + * + */ + def maximalIsr: Set[Int] + + /** + * Indicates if we have an AlterIsr request inflight. + */ + def inflight: Boolean Review comment: nit: `hasInflight`? ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -700,19 +764,16 @@ class Partition(val topicPartition: TopicPartition, inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR if (needsExpandIsr(followerReplica)) { - val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerId - info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to ${newInSyncReplicaIds.mkString(",")}") - // update ISR in ZK and cache - expandIsr(newInSyncReplicaIds) + expandIsr(followerReplica.brokerId) } } } } private def needsExpandIsr(followerReplica: Replica): Boolean = { - leaderLogIfLocal.exists { leaderLog => + !hasInFlightAlterIsr && leaderLogIfLocal.exists { leaderLog => Review comment: I think we can refactor this a little bit to avoid some duplication and inconsistency. We have the following logic above when updating follower state: ```scala if (!isrState.maximalIsr.contains(followerId)) maybeExpandIsr(followerReplica, followerFetchTimeMs) ``` This is a little inconsistent because here we are checking `isrState.isr`. I'd suggest splitting this method into something like the following: ```scala def hasReachedHighWatermark(follower: Replica): Boolean = { leaderLogIfLocal.exists { leaderLog => val leaderHighwatermark = leaderLog.highWatermark isFollowerInSync(follower, leaderHighwatermark) } } def canAddToIsr(followerId: Int): Boolean = { val current = isrState !current.inflight && !current.isr.contains(followerId) } def needsExpandIsr(follower: Replica): Boolean = { canAddToIsr(follower.brokerId) && hasReachedHighWatermark(follower) } ``` Then we can change the logic in `maybeExpandIsr` to the following: ```scala val needsIsrUpdate = canAddToIsr(followerReplica) && inReadLock(leaderIsrUpdateLock) { ... ``` ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition, } } + private def sendAlterIsrRequest(): Boolean = { + val isrToSend: Option[Set[Int]] = isrState match { + case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + newInSyncReplicaId) + case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- outOfSyncReplicaIds) + case CommittedIsr(_) => + error(s"Asked to send AlterIsr but there are no pending updates") + None + } + if (isrToSend.isDefined) { Review comment: nit: can probably rework this as `exists` ```scala isrToSendOpt.exists { isrToSend => ... } ``` ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -618,9 +682,9 @@ class Partition(val topicPartition: TopicPartition, // since the replica's logStartOffset may have incremented val leaderLWIncremented = newLeaderLW > oldLeaderLW - // check if we need to expand ISR to include this replica - // if it is not in the ISR yet - if (!inSyncReplicaIds.contains(followerId)) + // Check if this in-sync replica needs to be added to the ISR. We look at the "maximal" ISR here so we don't + // send an additional Alter ISR request for the same replica Review comment: Another possibility is that the replica is pending removal in which case another `AlterIsr` will be needed. I think it might be more intuitive to make this check: ```scala if (!isrState.inflight && !isrState.isr.contains(followerId)) ``` ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -858,10 +925,10 @@ class Partition(val topicPartition: TopicPartition, case Some(leaderLog) => val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs) if (outOfSyncReplicaIds.nonEmpty) { - val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds + val newInSyncReplicaIds = isrState.isr -- outOfSyncReplicaIds Review comment: Seems like we do not have a check for inflight AlterIsr after the write lock has been acquired. ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -222,24 +265,36 @@ class Partition(val topicPartition: TopicPartition, private val tags = Map("topic" -> topic, "partition" -> partitionId.toString) newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags) - newGauge("InSyncReplicasCount", () => if (isLeader) inSyncReplicaIds.size else 0, tags) + newGauge("InSyncReplicasCount", () => if (isLeader) isrState.isr.size else 0, tags) newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags) newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags) newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags) newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags) - def isUnderReplicated: Boolean = isLeader && (assignmentState.replicationFactor - inSyncReplicaIds.size) > 0 + def isUnderReplicated: Boolean = isLeader && (assignmentState.replicationFactor - isrState.isr.size) > 0 - def isUnderMinIsr: Boolean = leaderLogIfLocal.exists { inSyncReplicaIds.size < _.config.minInSyncReplicas } + def isUnderMinIsr: Boolean = leaderLogIfLocal.exists { isrState.isr.size < _.config.minInSyncReplicas } - def isAtMinIsr: Boolean = leaderLogIfLocal.exists { inSyncReplicaIds.size == _.config.minInSyncReplicas } + def isAtMinIsr: Boolean = leaderLogIfLocal.exists { isrState.isr.size == _.config.minInSyncReplicas } def isReassigning: Boolean = assignmentState.isInstanceOf[OngoingReassignmentState] def isAddingLocalReplica: Boolean = assignmentState.isAddingReplica(localBrokerId) def isAddingReplica(replicaId: Int): Boolean = assignmentState.isAddingReplica(replicaId) + /** + * Check if we have an in-flight AlterIsr + */ + def hasInFlightAlterIsr: Boolean = { + if (isrState.inflight) { + trace(s"ISR update in-flight, skipping update") Review comment: nit: It's surprising to have a side effect like this in a function like this. I think it would be better to include this logging at the caller when we are considering a specific change. That way we can also include in the log message information about the change that we were intending to make. ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -1222,22 +1288,74 @@ class Partition(val topicPartition: TopicPartition, } } - private def expandIsr(newIsr: Set[Int]): Unit = { - val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) + private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { + if (useAlterIsr) { + expandIsrWithAlterIsr(newInSyncReplica) + } else { + expandIsrWithZk(newInSyncReplica) + } + } + + private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = { + // This is called from maybeExpandIsr which holds the ISR write lock + if (!isrState.inflight) { + // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in + // a more constrained state for advancing the HW. + if (sendAlterIsrRequest()) { + // Only update our ISR state of AlterIsrManager accepts our update + isrState = PendingExpandIsr(isrState.isr, newInSyncReplica) + debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated to [${isrState.maximalIsr.mkString(",")}]") + } else { + throw new IllegalStateException("Failed to enqueue ISR expansion even though there was no apparent in-flight ISR changes") + } + } else { + trace(s"ISR update in-flight, not adding new in-sync replica $newInSyncReplica") + } + } + + private def expandIsrWithZk(newInSyncReplica: Int): Unit = { + val newInSyncReplicaIds = isrState.isr + newInSyncReplica + info(s"Expanding ISR from ${isrState.isr.mkString(",")} to ${newInSyncReplicaIds.mkString(",")}") + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newInSyncReplicaIds.toList, zkVersion) val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr) - maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) + maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt) + } + + private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = { + if (useAlterIsr) { + shrinkIsrWithAlterIsr(outOfSyncReplicas) + } else { + shrinkIsrWithZk(isrState.isr -- outOfSyncReplicas) + } + } + + private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = { + // This is called from maybeShrinkIsr which holds the ISR write lock + if (!isrState.inflight) { + // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW + // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get + // the next LeaderAndIsr + isrState = PendingShrinkIsr(isrState.isr, outOfSyncReplicas) + if (sendAlterIsrRequest()) { Review comment: It might be a little more intuitive to change the order here. Something like this: ```scala val upatedIsrState = PendingShrinkIsr(isrState.isr, outOfSyncReplicas) if (sendAlterIsrRequest(updatedIsrState)) { isrState = updatedIsrState ... ``` ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition, * is violated, that replica is considered to be out of sync * **/ - val candidateReplicaIds = inSyncReplicaIds - localBrokerId + val candidateReplicaIds = isrState.maximalIsr - localBrokerId Review comment: This is related to my comment above for the ISR expansion case, but it is a bit confusing to use maximal ISR when the expectation is that we will not shrink as long as we have a pending update inflight. Would it be better to check for inflights and document that this method will return an empty set as long as there is a pending AlterIsr request? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org