mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r555127356



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,78 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protects the updates of the inflight flag and prevents new pending items 
from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantReadWriteLock = new 
ReentrantReadWriteLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    val (didSubmit, needsPropagate) = inReadLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+        (true, !inflightRequest)
+      } else {
+        (false, false)
+      }
+    }
+    if (needsPropagate) {
+      propagateIsrChanges(true)
+    }
+    didSubmit
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
-      // Copy current unsent ISRs but don't remove from the map
-      val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
-      sendRequest(inflightAlterIsrItems.toSeq)
+  private def propagateIsrChanges(checkInflight: Boolean): Unit = 
inWriteLock(inflightLock) {

Review comment:
       I was thinking of the case when lots of partitions come up at once, like 
in a rolling restart scenario. However, not much happens in the submit call, so 
it's probably negligible. I tried this change out this morning and there's no 
noticeable difference. I'll polish it up and fix the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to