mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465029352



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    //val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+    /*if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+      // TODO is INVALID_REQUEST a reasonable error here?
+      callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+      info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+      callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+      return
+    }*/
+
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    val resp = new AlterIsrResponseData()
+    resp.setTopics(new util.ArrayList())
+
+    alterIsrRequest.topics().forEach(topicReq => {
+      val topicResp = new AlterIsrResponseTopics()
+        .setName(topicReq.name())
+        .setPartitions(new util.ArrayList())
+      resp.topics().add(topicResp)
+
+      topicReq.partitions().forEach(partitionReq => {
+        val partitionResp = new AlterIsrResponsePartitions()
+          .setPartitionIndex(partitionReq.partitionIndex())
+        topicResp.partitions().add(partitionResp)
+
+        // For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
       Ah, I see what you mean. Initially, I was concerned about blocking for 
too long while waiting for a response, but it looks like there is precedent for 
this pattern for some requests (reassignment, leader election, controlled 
shutdown). I'll move this validation and the callback down into the event 
processor method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to