hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r477618083



##########
File path: clients/src/main/resources/common/message/AlterIsrResponse.json
##########
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "AlterIsrResponse",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top level response error code" },
+    { "name": "Topics", "type": "[]AlterIsrResponseTopics", "versions": "0+", 
"fields": [

Review comment:
       nit: I think `AlterIsrResponseTopics` should be singular (similarly for 
other arrays in both of these schemas). 
   
   Also, I wonder if it's reasonable to leave off the `AlterIsr` prefix. We 
could access it as `AlterIsrResponse.TopicData` or something like that.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+    if (useAlterIsr) {
+      expandIsrWithAlterIsr(newInSyncReplica)
+    } else {
+      expandIsrWithZk(newInSyncReplica)
+    }
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+    // This is called from maybeExpandIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {
+      // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
+      // a more constrained state for advancing the HW.
+      val newIsr = inSyncReplicaIds + newInSyncReplica
+      pendingInSyncReplicaIds = Some(newIsr)
+      debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${newIsr.mkString(",")}] for $topicPartition")
+      alterIsr(newIsr)
+    } else {
+      debug(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica for $topicPartition")

Review comment:
       Maybe trace would be better? This could get verbose while we have an 
inflight AlterIsr.

##########
File path: clients/src/main/resources/common/message/AlterIsrRequest.json
##########
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "AlterIsrRequest",
+  "validVersions": "0",
+  "flexibleVersions": "none",

Review comment:
       We may as well add flexible version support for the request and response.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1234,6 +1314,36 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def alterIsr(newIsr: Set[Int]): Unit = {
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+    alterIsrManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, {
+      inWriteLock(leaderIsrUpdateLock) {
+        case Errors.NONE =>
+          debug(s"Controller accepted proposed ISR $newIsr for 
$topicPartition.")
+        case Errors.REPLICA_NOT_AVAILABLE | Errors.INVALID_REPLICA_ASSIGNMENT 
=>
+          warn(s"Controller rejected proposed ISR $newIsr for $topicPartition. 
Some replicas were not online or " +
+            s"in the partition assignment. Clearing pending ISR to allow 
leader to retry.")
+          pendingInSyncReplicaIds = None
+        case Errors.FENCED_LEADER_EPOCH =>
+          warn(s"Controller rejected proposed ISR $newIsr for $topicPartition 
since we have an old leader epoch. Not retrying.")
+        case Errors.NOT_LEADER_OR_FOLLOWER =>
+          warn(s"Controller rejected proposed ISR $newIsr for $topicPartition 
since we are not the leader. Not retrying.")
+        case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+          warn(s"Controller rejected proposed ISR $newIsr for $topicPartition 
since this partition is unknown. Not retrying.")
+        case Errors.INVALID_UPDATE_VERSION =>
+          warn(s"Controller rejected proposed ISR $newIsr for $topicPartition 
due to invalid ZK version. Not retrying.")
+        // Top-level errors that have been pushed down to partition level by 
AlterIsrManager
+        case Errors.STALE_BROKER_EPOCH =>
+          warn(s"Controller rejected proposed ISR $newIsr for $topicPartition 
due to a stale broker epoch. " +
+            s"Clearing pending ISR to allow leader to retry.")
+          pendingInSyncReplicaIds = None

Review comment:
       Hmm.. I am not sure it is safe to reset `pendingInSyncReplicaIds` in any 
case except `INVALID_UPDATE_VERSION`. For example, imagine the following 
sequence:
   
   1. Broker sends AlterIsr
   2. Controller writes new ISR and crashes before sending response
   3. Broker hits session expiration 
   4. Broker retries AlterIsr on new controller with old broker epoch
   5. Controller responds with STALE_BROKER_EPOCH
   
   In this case, the ISR was updated, but the broker is going to revert to the 
old state. I think the _only_ time we can reset `pendingInSyncReplicaIds` is 
when we know the change could not have been applied.

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##########
@@ -281,10 +283,10 @@ class PartitionLockTest extends Logging {
     
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
     when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, 
ArgumentMatchers.eq(topicPartition)))
       .thenReturn(None)
-    when(stateStore.shrinkIsr(ArgumentMatchers.anyInt, 
ArgumentMatchers.any[LeaderAndIsr]))
-      .thenReturn(Some(2))
-    when(stateStore.expandIsr(ArgumentMatchers.anyInt, 
ArgumentMatchers.any[LeaderAndIsr]))
-      .thenReturn(Some(2))
+    //when(stateStore.shrinkIsr(ArgumentMatchers.anyInt, 
ArgumentMatchers.any[LeaderAndIsr]))

Review comment:
       This should be fixed

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1756,6 +1762,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitions: Map[TopicPartition, Errors]) =>
+          resp.setTopics(new util.ArrayList())
+          partitions.groupBy(_._1.topic).foreachEntry((topic, partitionMap) => 
{
+            val topicResp = new AlterIsrResponseTopics()
+              .setName(topic)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            partitionMap.foreachEntry((partition, error) => {
+              topicResp.partitions.add(
+                new AlterIsrResponsePartitions()
+                  .setPartitionIndex(partition.partition)
+                  .setErrorCode(error.code))
+            })
+          })
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val partitionResponses: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+    // Determine which partitions we will accept the new ISR for
+    val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+      case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+        val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+              Errors.NOT_LEADER_OR_FOLLOWER
+            } else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+              Errors.FENCED_LEADER_EPOCH
+            } else {
+              val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+              if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+                warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+                Errors.INVALID_REPLICA_ASSIGNMENT
+              } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+                warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+                Errors.REPLICA_NOT_AVAILABLE
+              } else {
+                Errors.NONE
+              }
+            }
+          case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+        }
+        if (partitionError == Errors.NONE) {
+          partitionResponses(tp) = Errors.NONE
+          // Bump the leaderEpoch for partitions that we're going to write
+          Some(tp -> newLeaderAndIsr.newEpochAndZkVersion) // TODO only bump 
this for latest IBP
+        } else {
+          partitionResponses(tp) = partitionError
+          None
+        }
+    }
+
+    // Do the updates in ZK
+    info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")

Review comment:
       Maybe debug is more suitable?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -492,6 +514,8 @@ class Partition(val topicPartition: TopicPartition,
       val isr = partitionState.isr.asScala.map(_.toInt).toSet
       val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
       val removingReplicas = 
partitionState.removingReplicas.asScala.map(_.toInt)
+      info(s"Leader setting ISR to $isr for $topicPartition with leader epoch 
${partitionState.leaderEpoch}")

Review comment:
       Do we need this message? It seems the one on line 537 below has more 
detail already. It would be useful to include the zkVersion in the message on 
537 as well.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -255,6 +265,15 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  def inSyncReplicaIds(includeUncommittedReplicas: Boolean = false): Set[Int] 
= {

Review comment:
       I am wondering if we can split this into two separate methods:
   - `effectiveIsr`: takes into account any pending changes which may or may 
not have happened (I could probably also be convinced to call this `maximalIsr`)
   - `confirmedIsr`: the latest known value from Zookeeper (or the Controller)
   
   That makes the code easier to follow since we wouldn't have to interpret 
this flag. Some high-level comments might be helpful as well. For example, it's 
useful to mention somewhere that the high watermark is always treated with 
respect to the effective ISR.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                          val zkClient: KafkaZkClient,
+                          val scheduler: Scheduler,
+                          val time: Time,
+                          val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    unsentIsrUpdates synchronized {
+      unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(time.milliseconds)
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    unsentIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      unsentIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = time.milliseconds()
+    unsentIsrUpdates synchronized {
+      if (unsentIsrUpdates.nonEmpty) {
+        val brokerEpoch: Long = zkClient.getBrokerEpoch(brokerId) match {
+          case Some(brokerEpoch) => brokerEpoch
+          case None => throw new RuntimeException("Cannot send AlterIsr 
because we cannot determine broker epoch")
+        }
+
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        val callbacks = new mutable.HashMap[TopicPartition, Errors => Unit]()
+        
unsentIsrUpdates.values.groupBy(_.topicPartition.topic).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition)
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+            callbacks(item.topicPartition) = item.callback
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {
+          val body: AlterIsrResponse = 
response.responseBody().asInstanceOf[AlterIsrResponse]
+          val data: AlterIsrResponseData = body.data
+          Errors.forCode(data.errorCode) match {
+            case Errors.NONE =>
+              info(s"Controller handled AlterIsr request")
+              data.topics.forEach(topic => {
+                topic.partitions().forEach(partition => {
+                  callbacks(new TopicPartition(topic.name, 
partition.partitionIndex))(
+                    Errors.forCode(partition.errorCode))
+                })
+              })
+            case e: Errors =>
+              // Need to propagate top-level errors back to all partitions so 
they can react accordingly
+              warn(s"Controller returned a top-level error when handling 
AlterIsr request: $e")

Review comment:
       We seem to be losing some of the value of having a top-level error code 
here. As far as I can tell, the following top-level errors should be possible:
   
   1. NOT_CONTROLLER: should be retried (handled in 
`BrokerToControllerChannelManagerImpl`)
   2. STALE_BROKER_EPOCH: should be retried (could we do that here?)
   3. CLUSTER_AUTHORIZATION_FAILED: probably should be fatal (can we handle 
that here?)
   
   Seems like it might simplify the error handling if we can handle them at a 
corresponding granularity.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -133,6 +133,7 @@ object Partition extends KafkaMetricsGroup {
       replicaManager.delayedFetchPurgatory,
       replicaManager.delayedDeleteRecordsPurgatory)
 
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -748,7 +776,7 @@ class Partition(val topicPartition: TopicPartition,
     leaderLogIfLocal match {
       case Some(leaderLog) =>
         // keep the current immutable replica list reference
-        val curInSyncReplicaIds = inSyncReplicaIds
+        val curInSyncReplicaIds = inSyncReplicaIds(true)

Review comment:
       I have been thinking a little bit about the semantics of min.isr. 
Basically I am wondering if should be treated as a guarantee on the state of 
the ISR at the time the high watermark is reached (i.e. how many replicas are 
in the ISR), or instead should it be a guarantee on the state of progress of 
replication (i.e. some number of replicas have reached a given offset)? We may 
not have ever formally decided this, but here we are taking a stance that it is 
the latter because we are using the effective (uncommitted) ISR.
   
   One of the consequences of this view is that a leader may continue to accept 
appends satisfying min.isr even if the true ISR never reaches min.isr. For 
example, imagine we have the following state:
   
   replicas: (1, 2, 3)
   isr: (1)
   leader: 1
   
   Suppose that replica 2 has caught up to the leader, but the leader is unable 
to expand the ISR because the controller is unavailable or unreachable. With 
the logic here, we will nevertheless continue to satisfy acks=all requests with 
a min.isr of 2. 
   
   I am not sure there is much choice about it to be honest. If instead we used 
only the "confirmed" ISR, then we would have sort of an opposite problem. For 
example, consider this state:
   
   replicas: (1, 2, 3)
   isr: (1, 2)
   leader: 1
   
   Suppose the leader wants to remove 2 from the ISR. The AlterIsr is received 
by the controller and the state is updated, but the controller fails to send 
the corresponding LeaderAndIsr. Then committing on the basis of the confirmed 
ISR would lead to a similar problem.
   
   Here is the current documentation for the config:
   ```
     val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or 
\"-1\"), " +
       "min.insync.replicas specifies the minimum number of replicas that must 
acknowledge " +
       "a write for the write to be considered successful. If this minimum 
cannot be met, " +
       "then the producer will raise an exception (either NotEnoughReplicas or 
" +
       "NotEnoughReplicasAfterAppend).<br>When used together, 
min.insync.replicas and acks " +
       "allow you to enforce greater durability guarantees. A typical scenario 
would be to " +
       "create a topic with a replication factor of 3, set min.insync.replicas 
to 2, and " +
       "produce with acks of \"all\". This will ensure that the producer raises 
an exception " +
       "if a majority of replicas do not receive a write."
   ```
   Even though it is named in terms of the ISR, the documentation only 
discusses acks from other replicas, so it seems like the implementation here is 
consistent even if potentially surprising in some cases.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+    if (useAlterIsr) {
+      expandIsrWithAlterIsr(newInSyncReplica)
+    } else {
+      expandIsrWithZk(newInSyncReplica)
+    }
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+    // This is called from maybeExpandIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {

Review comment:
       Can we move this check earlier in the flow so that we can skip acquiring 
the write lock if there is an inflight AlterIsr? Maybe it can be part of 
`needsExpandIsr` and `needsShrinkIsr` for example.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                          val zkClient: KafkaZkClient,
+                          val scheduler: Scheduler,
+                          val time: Time,
+                          val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    unsentIsrUpdates synchronized {
+      unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(time.milliseconds)
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    unsentIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      unsentIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = time.milliseconds()
+    unsentIsrUpdates synchronized {
+      if (unsentIsrUpdates.nonEmpty) {
+        val brokerEpoch: Long = zkClient.getBrokerEpoch(brokerId) match {
+          case Some(brokerEpoch) => brokerEpoch
+          case None => throw new RuntimeException("Cannot send AlterIsr 
because we cannot determine broker epoch")
+        }
+
+        val message = new AlterIsrRequestData()
+          .setBrokerId(brokerId)
+          .setBrokerEpoch(brokerEpoch)
+          .setTopics(new util.ArrayList())
+
+        val callbacks = new mutable.HashMap[TopicPartition, Errors => Unit]()
+        
unsentIsrUpdates.values.groupBy(_.topicPartition.topic).foreachEntry((topic, 
items) => {
+          val topicPart = new AlterIsrRequestTopics()
+            .setName(topic)
+            .setPartitions(new util.ArrayList())
+          message.topics().add(topicPart)
+          items.foreach(item => {
+            topicPart.partitions().add(new AlterIsrRequestPartitions()
+              .setPartitionIndex(item.topicPartition.partition)
+              .setLeaderId(item.leaderAndIsr.leader)
+              .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
+              .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
+            )
+            callbacks(item.topicPartition) = item.callback
+          })
+        })
+
+        def responseHandler(response: ClientResponse): Unit = {
+          val body: AlterIsrResponse = 
response.responseBody().asInstanceOf[AlterIsrResponse]
+          val data: AlterIsrResponseData = body.data
+          Errors.forCode(data.errorCode) match {
+            case Errors.NONE =>
+              info(s"Controller handled AlterIsr request")

Review comment:
       Probably not a useful log message

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1234,6 +1314,36 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def alterIsr(newIsr: Set[Int]): Unit = {

Review comment:
       nit: maybe `sendAlterIsrRequest`?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+    if (useAlterIsr) {
+      expandIsrWithAlterIsr(newInSyncReplica)
+    } else {
+      expandIsrWithZk(newInSyncReplica)
+    }
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+    // This is called from maybeExpandIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {
+      // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
+      // a more constrained state for advancing the HW.
+      val newIsr = inSyncReplicaIds + newInSyncReplica
+      pendingInSyncReplicaIds = Some(newIsr)
+      debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${newIsr.mkString(",")}] for $topicPartition")
+      alterIsr(newIsr)
+    } else {
+      debug(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica for $topicPartition")
+    }
+  }
+
+  private def expandIsrWithZk(newInSyncReplica: Int): Unit = {
+    val newInSyncReplicaIds = inSyncReplicaIds + newInSyncReplica
+    info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to 
${newInSyncReplicaIds.mkString(",")}")
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newInSyncReplicaIds.toList, zkVersion)
     val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
   }
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+    if (useAlterIsr) {
+      shrinkIsrWithAlterIsr(outOfSyncReplicas)
+    } else {
+      shrinkIsrWithZk(inSyncReplicaIds -- outOfSyncReplicas)
+    }
+  }
+
+  private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = {
+    // This is called from maybeShrinkIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {
+      // When shrinking the ISR, we cannot assume that the update will succeed 
as this could erroneously advance the HW
+      // We update pendingInSyncReplicaIds here simply to prevent any further 
ISR updates from occurring until we get
+      // the next LeaderAndIsr
+      pendingInSyncReplicaIds = Some(inSyncReplicaIds)

Review comment:
       I still think we need a better name for `pendingInSyncReplicaIds` since 
it is misleading in this case. Maybe we could call it 
`overrideInSyncReplicaIds` or something like that?

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                          val zkClient: KafkaZkClient,
+                          val scheduler: Scheduler,
+                          val time: Time,
+                          val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    unsentIsrUpdates synchronized {
+      unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(time.milliseconds)
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {

Review comment:
       As far as I can tell, we don't have any logic which tells us whether 
there is an inflight request. I am considering whether we should as a mechanism 
for batching/flow control. It might be simpler if we just allow one inflight 
request. While we are waiting for it to return, we can collect additional 
pending updates. In case we need to retry the request, we could coalesce the 
new updates into the request.
   
   Note that currently `BrokerToControllerChannelManagerImpl` currently sets 
max inflight requests to 1 anyway.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+                          val zkClient: KafkaZkClient,
+                          val scheduler: Scheduler,
+                          val time: Time,
+                          val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+    unsentIsrUpdates synchronized {
+      unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+      lastIsrChangeMs.set(time.milliseconds)
+      // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+      // in fast succession
+      if (scheduledRequest.isEmpty) {
+        scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+      }
+    }
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+    unsentIsrUpdates synchronized {
+      // when we get a new LeaderAndIsr, we clear out any pending requests
+      unsentIsrUpdates.remove(topicPartition)
+    }
+  }
+
+  private def propagateIsrChanges(): Unit = {
+    val now = time.milliseconds()
+    unsentIsrUpdates synchronized {
+      if (unsentIsrUpdates.nonEmpty) {
+        val brokerEpoch: Long = zkClient.getBrokerEpoch(brokerId) match {

Review comment:
       Not sure about this. Do we really want to put zk in the path to sending 
to the controller?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1756,6 +1762,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitions: Map[TopicPartition, Errors]) =>
+          resp.setTopics(new util.ArrayList())
+          partitions.groupBy(_._1.topic).foreachEntry((topic, partitionMap) => 
{
+            val topicResp = new AlterIsrResponseTopics()
+              .setName(topic)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            partitionMap.foreachEntry((partition, error) => {
+              topicResp.partitions.add(
+                new AlterIsrResponsePartitions()
+                  .setPartitionIndex(partition.partition)
+                  .setErrorCode(error.code))
+            })
+          })
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val partitionResponses: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+    // Determine which partitions we will accept the new ISR for
+    val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+      case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+        val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+              Errors.NOT_LEADER_OR_FOLLOWER
+            } else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+              Errors.FENCED_LEADER_EPOCH
+            } else {
+              val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+              if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+                warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+                Errors.INVALID_REPLICA_ASSIGNMENT
+              } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+                warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+                Errors.REPLICA_NOT_AVAILABLE
+              } else {
+                Errors.NONE
+              }
+            }
+          case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+        }
+        if (partitionError == Errors.NONE) {
+          partitionResponses(tp) = Errors.NONE
+          // Bump the leaderEpoch for partitions that we're going to write
+          Some(tp -> newLeaderAndIsr.newEpochAndZkVersion) // TODO only bump 
this for latest IBP
+        } else {
+          partitionResponses(tp) = partitionError
+          None
+        }
+    }
+
+    // Do the updates in ZK
+    info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+    val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =  
zkClient.updateLeaderAndIsr(

Review comment:
       Should we try to make `AlterIsr` an idempotent operation? I think 
currently if we retry an update that was successfully applied, then we will see 
INVALID_VERSION.
   
   In general, I'm a bit concerned about the number of errors that are possible 
through this API and how the leader is supposed to handle them. I am thinking 
it might make our lives easier if we return some additional information in the 
response about what the current state really is. Let's say that we always try 
to add the full state tuple to the response: (leaderId, epoch, ISR, zkVersion). 
Then we can go through a simple process of reconciliation?
   
   - Am I still the leader?
   - Do I have the latest epoch?
   - Has the zkVersion been bumped?
   - Did the ISR change get applied?
   
   Basically I'm looking for a reliable way to determine whether we should 
continue retrying the request and whether it is safe to clear the pending 
replica set. At the same time, I'm feeling a bit on-the-fence about relying 
exclusively on LeaderAndIsr for state changes. If we need to return the current 
state in the response anyway to properly handle errors, then perhaps we may as 
well allow the state to be updated as well? This would actually be closer to 
the flow that we have today, which is the following:
   
   1. Leader changes state in Zookeeper and updates current ISR directly.
   2. After some delay, it posts the ISR update to isr_change_notifications.
   3. Controller picks up the notification and sends UpdateMetadata to all the 
brokers.
   
   Notice that the controller does not send LeaderAndIsr to the followers in 
this flow. What we could do is something more like the following:
   
   1. Leader sends AlterIsr to controller.
   2. Controller applies the change and returns the updated state.
   3. Leader receives the response and applies the state change.
   4. After some delay, the controller sends UpdateMetadata to the brokers with 
the change.
   
   If we did this, then we wouldn't need to have the controller bump the epoch 
when handling AlterIsr. Just as we do today, we can reserve epoch bumps for 
controller-initiated changes.
   
   Then we might be able to simplify the error handling to the following:
   
   - If the epoch is the same and we are still the leader, then apply the update
   - If the epoch is higher, leave pendingIsr set and do not bother retrying
   - Otherwise just keep retrying
   
   What do you think?
   

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1756,6 +1762,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitions: Map[TopicPartition, Errors]) =>
+          resp.setTopics(new util.ArrayList())
+          partitions.groupBy(_._1.topic).foreachEntry((topic, partitionMap) => 
{
+            val topicResp = new AlterIsrResponseTopics()
+              .setName(topic)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            partitionMap.foreachEntry((partition, error) => {
+              topicResp.partitions.add(
+                new AlterIsrResponsePartitions()
+                  .setPartitionIndex(partition.partition)
+                  .setErrorCode(error.code))
+            })
+          })
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val partitionResponses: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+    // Determine which partitions we will accept the new ISR for
+    val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+      case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+        val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+              Errors.NOT_LEADER_OR_FOLLOWER
+            } else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+              Errors.FENCED_LEADER_EPOCH
+            } else {
+              val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+              if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+                warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+                Errors.INVALID_REPLICA_ASSIGNMENT
+              } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+                warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+                Errors.REPLICA_NOT_AVAILABLE
+              } else {
+                Errors.NONE
+              }
+            }
+          case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+        }
+        if (partitionError == Errors.NONE) {
+          partitionResponses(tp) = Errors.NONE
+          // Bump the leaderEpoch for partitions that we're going to write
+          Some(tp -> newLeaderAndIsr.newEpochAndZkVersion) // TODO only bump 
this for latest IBP

Review comment:
       Don't forget the TODO!

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1756,6 +1762,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitions: Map[TopicPartition, Errors]) =>
+          resp.setTopics(new util.ArrayList())
+          partitions.groupBy(_._1.topic).foreachEntry((topic, partitionMap) => 
{
+            val topicResp = new AlterIsrResponseTopics()
+              .setName(topic)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            partitionMap.foreachEntry((partition, error) => {
+              topicResp.partitions.add(
+                new AlterIsrResponsePartitions()
+                  .setPartitionIndex(partition.partition)
+                  .setErrorCode(error.code))
+            })
+          })
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val partitionResponses: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+    // Determine which partitions we will accept the new ISR for
+    val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+      case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+        val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+              Errors.NOT_LEADER_OR_FOLLOWER
+            } else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+              Errors.FENCED_LEADER_EPOCH
+            } else {
+              val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+              if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+                warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+                Errors.INVALID_REPLICA_ASSIGNMENT
+              } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+                warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+                Errors.REPLICA_NOT_AVAILABLE
+              } else {
+                Errors.NONE
+              }
+            }
+          case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+        }
+        if (partitionError == Errors.NONE) {
+          partitionResponses(tp) = Errors.NONE
+          // Bump the leaderEpoch for partitions that we're going to write
+          Some(tp -> newLeaderAndIsr.newEpochAndZkVersion) // TODO only bump 
this for latest IBP
+        } else {
+          partitionResponses(tp) = partitionError
+          None
+        }
+    }
+
+    // Do the updates in ZK
+    info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+    val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =  
zkClient.updateLeaderAndIsr(
+      adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+    val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+      case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
+      isrOrError match {
+        case Right(updatedIsr) =>
+          info("ISR for partition %s updated to [%s] and zkVersion updated to 
[%d]".format(partition, updatedIsr.isr.mkString(","), updatedIsr.zkVersion))

Review comment:
       Could be debug perhaps?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {

Review comment:
       The conversion logic is a tad annoying, but it makes the rest of the 
code nicer. I'm ok with it. That said, could we use scala conventions, e.g.:
   
   ```scala
       alterIsrRequest.topics.forEach { topicReq => 
         topicReq.partitions.forEach { partitionReq =>
   ```

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -278,10 +284,10 @@ class KafkaController(val config: KafkaConfig,
   private def onControllerResignation(): Unit = {
     debug("Resigning")
     // de-register listeners
-    
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
     zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
     zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
     
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+    zkClient.unregisterStateChangeHandler(isrChangeNotificationHandler.path)

Review comment:
       Any particular reason to change the order here?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -113,9 +124,10 @@ class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache
       brokerToControllerListenerName, time, threadName)
   }
 
-  private[server] def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-                                  callback: RequestCompletionHandler): Unit = {
+  override def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
+                  callback: RequestCompletionHandler): Unit = {

Review comment:
       nit: misaligned

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1756,6 +1762,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitions: Map[TopicPartition, Errors]) =>
+          resp.setTopics(new util.ArrayList())
+          partitions.groupBy(_._1.topic).foreachEntry((topic, partitionMap) => 
{
+            val topicResp = new AlterIsrResponseTopics()
+              .setName(topic)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            partitionMap.foreachEntry((partition, error) => {
+              topicResp.partitions.add(
+                new AlterIsrResponsePartitions()
+                  .setPartitionIndex(partition.partition)
+                  .setErrorCode(error.code))
+            })
+          })
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],

Review comment:
       We should probably have a try/catch in here somewhere for the unhandled 
errors to make sure that the callback always gets applied.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to