hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r556053749



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -112,68 +111,74 @@ class DefaultAlterIsrManager(
   val brokerEpochSupplier: () => Long
 ) extends AlterIsrManager with Logging with KafkaMetricsGroup {
 
-  // Used to allow only one pending ISR update per partition
-  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+  // Used to allow only one pending ISR update per partition (visible for 
testing)
+  private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] 
= new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
   private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
-
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null
+    maybePropagateIsrChanges()
+    enqueued
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
+  private[server] def maybePropagateIsrChanges(): Unit = {
+    // Send all pending items if there is not already a request in-flight.
     if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
-      // Copy current unsent ISRs but don't remove from the map
+      // Copy current unsent ISRs but don't remove from the map, they get 
cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
       unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
       sendRequest(inflightAlterIsrItems.toSeq)
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
-
-    def clearInflightRequests(): Unit = {
-      // Be sure to clear the in-flight flag to allow future AlterIsr requests
-      if (!inflightRequest.compareAndSet(true, false)) {
-        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
-      }
+  private[server] def clearInFlightRequest(): Unit = {
+    if(!inflightRequest.compareAndSet(true, false)) {

Review comment:
       nit: space after `if`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to