hachikuji commented on a change in pull request #9749: URL: https://github.com/apache/kafka/pull/9749#discussion_r554105440
########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -122,43 +121,47 @@ class DefaultAlterIsrManager( override def start(): Unit = { controllerChannelManager.start() - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) } override def shutdown(): Unit = { controllerChannelManager.shutdown() } override def submit(alterIsrItem: AlterIsrItem): Boolean = { - unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { + if (inflightRequest.compareAndSet(false, true)) { + // optimistically set the inflight flag even though we haven't sent the request yet + scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS) Review comment: If we are only waiting 1ms, would it be simpler to call `propagateIsrChanges` directly? Similarly after receiving a response with no error. ########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -122,43 +121,47 @@ class DefaultAlterIsrManager( override def start(): Unit = { controllerChannelManager.start() - scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS) } override def shutdown(): Unit = { controllerChannelManager.shutdown() } override def submit(alterIsrItem: AlterIsrItem): Boolean = { - unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) { + if (inflightRequest.compareAndSet(false, true)) { Review comment: This seems to introduce a race condition. Say we have an inflight request. Is the following sequence possible? 1. the response returns and the io thread calls `propagateIsrChanges` and sees an empty `unsentIsrUpdates` 2. request thread calls submit and inserts a new item in `unsentIsrUpdates` 3. request thread fails `compareAndSet` on `inflightRequest` 4. io thread clears `inflightRequest` It seems like we might need a lock. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org