mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r494520245
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig, } } + def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { + val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() + + alterIsrRequest.topics.forEach { topicReq => + topicReq.partitions.forEach { partitionReq => + val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex) + val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt) + isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion)) + } + } + + def responseCallback(results: Either[Map[TopicPartition, Either[Errors, LeaderAndIsr]], Errors]): Unit = { + val resp = new AlterIsrResponseData() + results match { + case Right(error) => + resp.setErrorCode(error.code) + case Left(partitionResults) => + resp.setTopics(new util.ArrayList()) + partitionResults + .groupBy { case (tp, _) => tp.topic } // Group by topic + .foreach { case (topic, partitions) => + // Add each topic part to the response + val topicResp = new AlterIsrResponseData.TopicData() + .setName(topic) + .setPartitions(new util.ArrayList()) + resp.topics.add(topicResp) + partitions.foreach { case (tp, errorOrIsr) => + // Add each partition part to the response (new ISR or error) + errorOrIsr match { + case Left(error) => topicResp.partitions.add( + new AlterIsrResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + case Right(leaderAndIsr) => topicResp.partitions.add( + new AlterIsrResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderId(leaderAndIsr.leader) + .setLeaderEpoch(leaderAndIsr.leaderEpoch) + .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava) + .setCurrentIsrVersion(leaderAndIsr.zkVersion)) + } + } + } + } + callback.apply(resp) + } + + eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback)) + } + + private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr], + callback: AlterIsrCallback): Unit = { + + // Handle a few short-circuits + if (!isActive) { + callback.apply(Right(Errors.NOT_CONTROLLER)) + return + } + + val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) + if (brokerEpochOpt.isEmpty) { + info(s"Ignoring AlterIsr due to unknown broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return + } + + if (!brokerEpochOpt.contains(brokerEpoch)) { + info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for broker $brokerId") + callback.apply(Right(Errors.STALE_BROKER_EPOCH)) + return + } + + val response = try { + val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]() + + // Determine which partitions we will accept the new ISR for + val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap { + case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) => + val partitionError: Errors = controllerContext.partitionLeadershipInfo(tp) match { + case Some(leaderIsrAndControllerEpoch) => + val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr + if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { Review comment: I was trying to think some kind of race with a zombie leader trying to update the ISR, however this would get fenced by the leader epoch. This should be pretty easy to add ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition, } } + private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = { + val isrToSendOpt: Option[Set[Int]] = proposedIsrState match { + case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + newInSyncReplicaId) + case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- outOfSyncReplicaIds) + case CommittedIsr(_) => + error(s"Asked to send AlterIsr but there are no pending updates") + None + } + isrToSendOpt.exists { isrToSend => + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion) + val callbackPartial = handleAlterIsrResponse(isrToSend, _ : Either[Errors, LeaderAndIsr]) + alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, callbackPartial)) + } + } + + private def handleAlterIsrResponse(proposedIsr: Set[Int], result: Either[Errors, LeaderAndIsr]): Unit = { + inWriteLock(leaderIsrUpdateLock) { + result match { + case Left(error: Errors) => error match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION => + debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since it doesn't know about this topic or partition. Giving up.") + case Errors.FENCED_LEADER_EPOCH => + debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.") + case Errors.INVALID_UPDATE_VERSION => + debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to invalid zk version. Retrying.") + sendAlterIsrRequest(isrState) + case _ => + warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to $error. Retrying.") + sendAlterIsrRequest(isrState) Review comment: True, we could see a new ISR from controller initiated changes via LeaderAndIsr while our request is in-flight. We have a check for this on successful responses, but we should also check here. Since our request failed, we don't have a leaderEpoch to check against so I think the best we can do is see if `isrState` is still pending before re-sending the request ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org