mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465748120



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1771,6 +1776,141 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics().forEach(topicReq => 
topicReq.partitions().forEach(partitionReq => {
+      val tp = new TopicPartition(topicReq.name(), 
partitionReq.partitionIndex())
+      val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+      isrsToAlter.put(tp, new LeaderAndIsr(partitionReq.leaderId(), 
partitionReq.leaderEpoch(), newIsr, partitionReq.currentIsrVersion()))
+    }))
+
+    def responseCallback(results: Either[Map[TopicPartition, Errors], 
Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code())
+        case Left(partitions: Map[TopicPartition, Errors]) =>
+          resp.setTopics(new util.ArrayList())
+          partitions.groupBy(_._1.topic()).foreachEntry((topic, partitionMap) 
=> {
+            val topicResp = new AlterIsrResponseTopics()
+              .setName(topic)
+              .setPartitions(new util.ArrayList())
+            resp.topics().add(topicResp)
+            partitionMap.foreachEntry((partition, error) => {
+              topicResp.partitions().add(
+                new AlterIsrResponsePartitions()
+                  .setPartitionIndex(partition.partition())
+                  .setErrorCode(error.code()))
+            })
+          })
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId(), 
alterIsrRequest.brokerEpoch(), isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val partitionErrors: mutable.Map[TopicPartition, Errors] = 
mutable.HashMap[TopicPartition, Errors]()
+
+    val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap {
+      case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+        val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leader != currentLeaderAndIsr.leader) {
+              Errors.NOT_LEADER_OR_FOLLOWER
+            } else if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
+              Errors.FENCED_LEADER_EPOCH
+            } else {
+              val currentAssignment = 
controllerContext.partitionReplicaAssignment(tp)
+              if (!newLeaderAndIsr.isr.forall(replicaId => 
currentAssignment.contains(replicaId))) {
+                warn(s"Some of the proposed ISR are not in the assignment for 
partition $tp. Proposed ISR=$newLeaderAndIsr.isr assignment=$currentAssignment")
+                Errors.INVALID_REQUEST
+              } else if (!newLeaderAndIsr.isr.forall(replicaId => 
controllerContext.isReplicaOnline(replicaId, tp))) {
+                warn(s"Some of the proposed ISR are offline for partition $tp. 
Proposed ISR=$newLeaderAndIsr.isr")
+                Errors.INVALID_REQUEST
+              } else {
+                Errors.NONE
+              }
+            }
+          case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+        }
+        if (partitionError == Errors.NONE) {
+          // Bump the leaderEpoch for partitions that we're going to write
+          Some(tp -> newLeaderAndIsr.newEpochAndZkVersion)
+        } else {
+          partitionErrors.put(tp, partitionError)
+          None
+        }
+    }
+
+    // Do the updates in ZK
+    info(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+    val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =  
zkClient.updateLeaderAndIsr(
+      adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+    val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
+      case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
+      isrOrError match {
+        case Right(updatedIsr) =>
+          info("ISR for partition %s updated to [%s] and zkVersion updated to 
[%d]".format(partition, updatedIsr.isr.mkString(","), updatedIsr.zkVersion))
+          Some(partition -> updatedIsr)
+        case Left(error) =>
+          warn(s"Failed to update ISR for partition $partition", error)
+          partitionErrors.put(partition, Errors.forException(error))
+          None
+      }
+    }
+
+    badVersionUpdates.foreach(partition => {
+      warn(s"Failed to update ISR for partition $partition, bad ZK version")
+      partitionErrors.put(partition, Errors.INVALID_ISR_VERSION)
+    })
+
+    // Update our cache
+    updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
+
+    // Send back AlterIsr response
+    callback.apply(Left(partitionErrors))
+
+    // Send out LeaderAndIsr for successful updates
+    brokerRequestBatch.newBatch()
+
+    // Send LeaderAndIsr for all requested partitions

Review comment:
       I think we need to send LeaderAndIsr for all the given partitions 
whether we updated the ISR or not. In cases where we failed due, the leaders 
likely have stale metadata. This way we can proactively send them the latest 
state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to