jsancio commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r460468241
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends KafkaMetricsGroup { sealed trait ControllerEvent { def state: ControllerState + def preempt(): Unit Review comment: I would add a documentation comment to this method explaining that this method will not be executed by the controller thread but instead it will be executed by some other thread. ########## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ########## @@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int, private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test - private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + private[controller] var thread = new ControllerEventThread(ControllerEventThreadName) Review comment: This comment applies to `clearAndPut`: ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) { queue.forEach(_.preempt(processor)) queue.clear() put(event) } ``` I think there is a bug here where at most one event will be process twice. Once by `_preempt(processor)` and once by `doWork`. I think we can fix this concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g. ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = { val events = ...; inLock(putLock) { queue.drainTo(events) put(event) } event.forEach(_.preempt(processor)) } ``` ########## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ########## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { + servers = makeServers(1, enableControlledShutdown = false) + val controller = getController().kafkaController + val count = new AtomicInteger(2) + val latch = new CountDownLatch(1) + val spyThread = spy(controller.eventManager.thread) + controller.eventManager.setControllerEventThread(spyThread) + val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} + } + val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() Review comment: This method should be executed by the thread that is running this test. If you agree, no need to use an `AtomicInteger`. ########## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ########## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { + servers = makeServers(1, enableControlledShutdown = false) + val controller = getController().kafkaController + val count = new AtomicInteger(2) + val latch = new CountDownLatch(1) + val spyThread = spy(controller.eventManager.thread) + controller.eventManager.setControllerEventThread(spyThread) + val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} + } + val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() + } + + controller.eventManager.put(processedEvent) + controller.eventManager.put(preemptedEvent) + controller.eventManager.put(preemptedEvent) + + doAnswer((_: InvocationOnMock) => { + latch.countDown() + }).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: The important part is that we want to do `latch.countDown` after `initiateShutdown` has been called so that the controller thread doesn't pick up a new event because `isRunning` is `false`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org