hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r555294948



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protect updates of the inflight flag and prevent additional pending items 
from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantLock = new ReentrantLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    inLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+        maybePropagateIsrChanges()
+        true
+      } else {
+        false
+      }
+    }
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
-      // Copy current unsent ISRs but don't remove from the map
+  private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) {
+    // Send all pending items if there is not already a request in-flight.
+    if (!inflightRequest && !unsentIsrUpdates.isEmpty) {
+      // Copy current unsent ISRs but don't remove from the map, they get 
cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
       unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
       sendRequest(inflightAlterIsrItems.toSeq)
+      inflightRequest = true
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
-
-    def clearInflightRequests(): Unit = {
-      // Be sure to clear the in-flight flag to allow future AlterIsr requests
-      if (!inflightRequest.compareAndSet(true, false)) {
-        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
-      }
+  private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) {
+    if (!inflightRequest) {
+      warn("Attempting to clear AlterIsr in-flight flag when no apparent 
request is in-flight")
     }
+    inflightRequest = false
+  }
 
+  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+    val message = buildRequest(inflightAlterIsrItems)
     debug(s"Sending AlterIsr to controller $message")
 
     // We will not timeout AlterISR request, instead letting it retry 
indefinitely
     // until a response is received, or a new LeaderAndIsr overwrites the 
existing isrState
-    // which causes the inflight requests to be ignored.
+    // which causes the response for those partitions to be ignored.
     controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
-          try {
-            debug(s"Received AlterIsr response $response")
-            val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-            handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems)
-          } finally {
-            clearInflightRequests()
+          debug(s"Received AlterIsr response $response")
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems) match {
+            case Errors.NONE =>
+              // In the normal case, check for pending updates to send 
immediately
+              clearInFlightRequest()

Review comment:
       nit: shall we pull this out of the match since it is done regardless? It 
would also make the code a little more resilient since we got rid of the 
`try/catch` to put it before `handleAlterIsrResponse`.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protect updates of the inflight flag and prevent additional pending items 
from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantLock = new ReentrantLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, 
TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == 
null
+    inLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, 
alterIsrItem) == null) {
+        maybePropagateIsrChanges()
+        true
+      } else {
+        false
+      }
+    }
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, 
true)) {
-      // Copy current unsent ISRs but don't remove from the map
+  private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) {
+    // Send all pending items if there is not already a request in-flight.
+    if (!inflightRequest && !unsentIsrUpdates.isEmpty) {
+      // Copy current unsent ISRs but don't remove from the map, they get 
cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
       unsentIsrUpdates.values().forEach(item => 
inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
       sendRequest(inflightAlterIsrItems.toSeq)
+      inflightRequest = true
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
-
-    def clearInflightRequests(): Unit = {
-      // Be sure to clear the in-flight flag to allow future AlterIsr requests
-      if (!inflightRequest.compareAndSet(true, false)) {
-        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
-      }
+  private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) {
+    if (!inflightRequest) {
+      warn("Attempting to clear AlterIsr in-flight flag when no apparent 
request is in-flight")
     }
+    inflightRequest = false
+  }
 
+  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+    val message = buildRequest(inflightAlterIsrItems)
     debug(s"Sending AlterIsr to controller $message")
 
     // We will not timeout AlterISR request, instead letting it retry 
indefinitely
     // until a response is received, or a new LeaderAndIsr overwrites the 
existing isrState
-    // which causes the inflight requests to be ignored.
+    // which causes the response for those partitions to be ignored.
     controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
-          try {
-            debug(s"Received AlterIsr response $response")
-            val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-            handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems)
-          } finally {
-            clearInflightRequests()
+          debug(s"Received AlterIsr response $response")
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems) match {

Review comment:
       Just doublechecking our locking order. When we call `submit` from 
`Partition`, we first have the leader and ISR write lock and then we acquire 
the inflight lock added here. Now when we call `handleAlterIsrResponse`, we may 
need to reacquire the leader and ISR write lock, but that is ok, because do not 
need to hold the inflight lock when we do so. I think it might be worth adding 
some comments on the locking order somewhere in this class since the use of the 
leader and ISR lock is kind of hidden.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new 
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile

Review comment:
       Do we still need this? It looks like all accesses are protected with the 
lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to