hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490527627



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def sendAlterIsrRequest(): Boolean = {
+    val isrToSend: Option[Set[Int]] = isrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+      case CommittedIsr(_) =>
+        error(s"Asked to send AlterIsr but there are no pending updates")
+        None
+    }
+    if (isrToSend.isDefined) {
+      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.get.toList, zkVersion)
+      val callbackPartial = handleAlterIsrResponse(isrToSend.get, _ : 
Either[Errors, LeaderAndIsr])
+      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+    } else {
+      false
+    }
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+    inWriteLock(leaderIsrUpdateLock) {
+      result match {
+        case Left(error: Errors) => error match {
+          case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+          case Errors.FENCED_LEADER_EPOCH =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+          case _ =>

Review comment:
       Since `INVALID_UPDATE_VERSION` is one of the expected errors at this 
level, can we add a separate case for it? For unexpected errors, we might want 
to log at warn level.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults.groupBy(_._1.topic).foreach { entry =>
+            val topicResp = new AlterIsrResponseData.TopicData()
+              .setName(entry._1)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            entry._2.foreach { partitionEntry =>
+              partitionEntry._2 match {
+                case Left(error) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setErrorCode(error.code))
+                case Right(leaderAndIsr) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setLeaderId(leaderAndIsr.leader)
+                    .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                    .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                    .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+              }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses: mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]] =
+        mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+      // Determine which partitions we will accept the new ISR for
+      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+              if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+                Errors.FENCED_LEADER_EPOCH
+              } else {
+                Errors.NONE
+              }
+            case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+          }
+          if (partitionError == Errors.NONE) {
+            Some(tp -> newLeaderAndIsr)
+          } else {
+            partitionResponses(tp) = Left(partitionError)
+            None
+          }
+      }
+
+      // Do the updates in ZK
+      debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+      val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
+        adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
+
+      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
+          isrOrError match {
+            case Right(updatedIsr) =>
+              debug("ISR for partition %s updated to [%s] and zkVersion 
updated to [%d]".format(partition, updatedIsr.isr.mkString(","), 
updatedIsr.zkVersion))
+              partitionResponses(partition) = Right(updatedIsr)
+              Some(partition -> updatedIsr)
+            case Left(error) =>
+              warn(s"Failed to update ISR for partition $partition", error)
+              partitionResponses(partition) = Left(Errors.forException(error))
+              None
+          }
+      }
+
+      badVersionUpdates.foreach(partition => {
+        warn(s"Failed to update ISR for partition $partition, bad ZK version")
+        partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+      })
+
+      def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
+        val liveBrokers: Seq[Int] = 
controllerContext.liveOrShuttingDownBrokerIds.toSeq
+        debug(s"Sending MetadataRequest to Brokers: $liveBrokers for 
TopicPartitions: $partitions")
+        sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
+      }
+
+      // Update our cache and send out metadata updates
+      updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
+      processUpdateNotifications(isrsToAlter.keys.toSeq)
+
+      Left(partitionResponses)
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterIsr request", e)

Review comment:
       Shall we include some details about the failed request?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults.groupBy(_._1.topic).foreach { entry =>
+            val topicResp = new AlterIsrResponseData.TopicData()
+              .setName(entry._1)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            entry._2.foreach { partitionEntry =>
+              partitionEntry._2 match {
+                case Left(error) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setErrorCode(error.code))
+                case Right(leaderAndIsr) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setLeaderId(leaderAndIsr.leader)
+                    .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                    .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                    .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+              }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses: mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]] =
+        mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+      // Determine which partitions we will accept the new ISR for
+      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+              if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+                Errors.FENCED_LEADER_EPOCH
+              } else {
+                Errors.NONE
+              }
+            case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+          }
+          if (partitionError == Errors.NONE) {
+            Some(tp -> newLeaderAndIsr)
+          } else {
+            partitionResponses(tp) = Left(partitionError)
+            None
+          }
+      }
+
+      // Do the updates in ZK
+      debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+      val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
+        adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
+
+      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
+          isrOrError match {
+            case Right(updatedIsr) =>
+              debug("ISR for partition %s updated to [%s] and zkVersion 
updated to [%d]".format(partition, updatedIsr.isr.mkString(","), 
updatedIsr.zkVersion))

Review comment:
       nit: rewrite with `$`

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults.groupBy(_._1.topic).foreach { entry =>
+            val topicResp = new AlterIsrResponseData.TopicData()
+              .setName(entry._1)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            entry._2.foreach { partitionEntry =>
+              partitionEntry._2 match {
+                case Left(error) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setErrorCode(error.code))
+                case Right(leaderAndIsr) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setLeaderId(leaderAndIsr.leader)
+                    .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                    .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                    .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+              }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses: mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]] =
+        mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+      // Determine which partitions we will accept the new ISR for
+      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+              if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+                Errors.FENCED_LEADER_EPOCH
+              } else {
+                Errors.NONE
+              }
+            case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+          }
+          if (partitionError == Errors.NONE) {
+            Some(tp -> newLeaderAndIsr)
+          } else {
+            partitionResponses(tp) = Left(partitionError)
+            None
+          }
+      }
+
+      // Do the updates in ZK
+      debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+      val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
+        adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
+
+      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
+          isrOrError match {
+            case Right(updatedIsr) =>
+              debug("ISR for partition %s updated to [%s] and zkVersion 
updated to [%d]".format(partition, updatedIsr.isr.mkString(","), 
updatedIsr.zkVersion))
+              partitionResponses(partition) = Right(updatedIsr)
+              Some(partition -> updatedIsr)
+            case Left(error) =>
+              warn(s"Failed to update ISR for partition $partition", error)
+              partitionResponses(partition) = Left(Errors.forException(error))
+              None
+          }
+      }
+
+      badVersionUpdates.foreach(partition => {
+        warn(s"Failed to update ISR for partition $partition, bad ZK version")

Review comment:
       I think `warn` might be too high here. We should expect to see some of 
these even if the cluster is working properly. How about debug?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3054,6 +3054,26 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+    val alterIsrRequest = request.body[AlterIsrRequest]
+
+    if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {

Review comment:
       nit: it's subjective, so feel free to ignore, but I find this a little 
easier to read if we handle the error cases first. So..
   ```scala
   if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
         sendResponseMaybeThrottle(request, requestThrottleMs =>
           alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
  
   } else if (!controller.isActive) {
        sendResponseMaybeThrottle(request, requestThrottleMs =>
           alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))
   } else {
   ...
   }
   ```
   Basically we're discarding the error cases so that the successful path 
continues flowing downward and we're avoiding extra nesting. Like I said, it's 
subjective.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults.groupBy(_._1.topic).foreach { entry =>
+            val topicResp = new AlterIsrResponseData.TopicData()
+              .setName(entry._1)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            entry._2.foreach { partitionEntry =>
+              partitionEntry._2 match {
+                case Left(error) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setErrorCode(error.code))
+                case Right(leaderAndIsr) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setLeaderId(leaderAndIsr.leader)
+                    .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                    .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                    .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+              }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses: mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]] =

Review comment:
       nit: use type inference. It's conventional to write this as 
   ```scala
   val partitionResponses = mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]].empty()
   ```

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults.groupBy(_._1.topic).foreach { entry =>
+            val topicResp = new AlterIsrResponseData.TopicData()
+              .setName(entry._1)
+              .setPartitions(new util.ArrayList())
+            resp.topics.add(topicResp)
+            entry._2.foreach { partitionEntry =>
+              partitionEntry._2 match {
+                case Left(error) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setErrorCode(error.code))
+                case Right(leaderAndIsr) => topicResp.partitions.add(
+                  new AlterIsrResponseData.PartitionData()
+                    .setPartitionIndex(partitionEntry._1.partition)
+                    .setLeaderId(leaderAndIsr.leader)
+                    .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                    .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                    .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+              }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses: mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]] =
+        mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+      // Determine which partitions we will accept the new ISR for
+      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+              if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+                Errors.FENCED_LEADER_EPOCH
+              } else {
+                Errors.NONE
+              }
+            case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+          }
+          if (partitionError == Errors.NONE) {
+            Some(tp -> newLeaderAndIsr)
+          } else {
+            partitionResponses(tp) = Left(partitionError)
+            None
+          }
+      }
+
+      // Do the updates in ZK
+      debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+      val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
+        adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
+
+      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
+          isrOrError match {
+            case Right(updatedIsr) =>
+              debug("ISR for partition %s updated to [%s] and zkVersion 
updated to [%d]".format(partition, updatedIsr.isr.mkString(","), 
updatedIsr.zkVersion))
+              partitionResponses(partition) = Right(updatedIsr)
+              Some(partition -> updatedIsr)
+            case Left(error) =>
+              warn(s"Failed to update ISR for partition $partition", error)
+              partitionResponses(partition) = Left(Errors.forException(error))
+              None
+          }
+      }
+
+      badVersionUpdates.foreach(partition => {
+        warn(s"Failed to update ISR for partition $partition, bad ZK version")
+        partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+      })
+
+      def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
+        val liveBrokers: Seq[Int] = 
controllerContext.liveOrShuttingDownBrokerIds.toSeq
+        debug(s"Sending MetadataRequest to Brokers: $liveBrokers for 
TopicPartitions: $partitions")

Review comment:
       nit: I think we can get rid of this. The logging in 
`ControllerChannelManager.sendUpdateMetadataRequests` is probably good enough.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -298,9 +298,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         socketServer = new SocketServer(config, metrics, time, 
credentialProvider)
         socketServer.startup(startProcessingRequests = false)
 
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1358,7 +1366,8 @@ class ReplicaManager(val config: KafkaConfig,
                 stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
                   s"controller $controllerId with correlation id 
$correlationId " +
                   s"epoch $controllerEpoch for partition $topicPartition since 
its associated " +
-                  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch")
+                  s"leader epoch $requestLeaderEpoch matches the current 
leader epoch " +
+                  s"and the zk version $requestZkVersion matches the current 
zk version")

Review comment:
       nit: not sure it makes sense to include this change any longer

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults.groupBy(_._1.topic).foreach { entry =>

Review comment:
       nit: can we avoid using `_1` and _2`? It's a lot easier to follow if 
they are named.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -200,9 +241,11 @@ class Partition(val topicPartition: TopicPartition,
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
-  @volatile var inSyncReplicaIds = Set.empty[Int]
+  @volatile var isrState: IsrState = CommittedIsr(Set.empty)

Review comment:
       I wonder if we should be exposing this. Would it be enough to have a 
`def inSyncReplicaIds = isrState.isr`? One thing we need to be a little careful 
of is the fact that we now have a volatile variable with multiple fields. So if 
you try to access two fields through the `isrState` reference, you could see 
inconsistent data.

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1257,20 +1250,18 @@ class PartitionTest extends AbstractPartitionTest {
 
     // On initialization, the replica is considered caught up and should not 
be removed
     partition.maybeShrinkIsr()
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
 
     // If enough time passes without a fetch update, the ISR should shrink
     time.sleep(partition.replicaLagTimeMaxMs + 1)
-    val updatedLeaderAndIsr = LeaderAndIsr(
-      leader = brokerId,
-      leaderEpoch = leaderEpoch,
-      isr = List(brokerId),
-      zkVersion = 1)
-    when(stateStore.shrinkIsr(controllerEpoch, 
updatedLeaderAndIsr)).thenReturn(Some(2))
 
+    // Shrink the ISR
     partition.maybeShrinkIsr()
-    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
-    assertEquals(10L, partition.localLogOrException.highWatermark)
+    assertEquals(alterIsrManager.isrUpdates.size, 1)

Review comment:
       I may have missed it, but do we have tests which verify error handling? 
I see tests which verify requests get sent, but at a quick glance I didn't see 
tests of responses.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1065,6 +1065,25 @@ object TestUtils extends Logging {
                    logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size))
   }
 
+  class TestAlterIsrManager extends AlterIsrManager {

Review comment:
       nit: sort of conventional to use a name like `MockAlterIsrManager`

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1149,31 +1153,27 @@ class PartitionTest extends AbstractPartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 6L)
 
-    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
+    assertEquals(Set(brokerId), partition.isrState.isr)
     assertEquals(3L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
 
-    // The next update should bring the follower back into the ISR
-    val updatedLeaderAndIsr = LeaderAndIsr(
-      leader = brokerId,
-      leaderEpoch = leaderEpoch,
-      isr = List(brokerId, remoteBrokerId),
-      zkVersion = 1)
-    when(stateStore.expandIsr(controllerEpoch, 
updatedLeaderAndIsr)).thenReturn(Some(2))
-
     partition.updateFollowerFetchState(remoteBrokerId,
       followerFetchOffsetMetadata = LogOffsetMetadata(10),
       followerStartOffset = 0L,
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 6L)
 
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
+    assertEquals(alterIsrManager.isrUpdates.size, 1)
+    assertEquals(alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr, 
List(brokerId, remoteBrokerId))
+    assertEquals(Set(brokerId), partition.isrState.isr)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
   }
 
   @Test
   def testIsrNotExpandedIfUpdateFails(): Unit = {
+    // TODO maybe remove this test now?

Review comment:
       Need to address the TODOs in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to