jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r460468241



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends 
KafkaMetricsGroup {
 
 sealed trait ControllerEvent {
   def state: ControllerState
+  def preempt(): Unit

Review comment:
       I would add a documentation comment to this method explaining that this 
method will not be executed by the controller thread but instead it will be 
executed by some other thread.

##########
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##########
@@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int,
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
-  private[controller] val thread = new 
ControllerEventThread(ControllerEventThreadName)
+  private[controller] var thread = new 
ControllerEventThread(ControllerEventThreadName)

Review comment:
       This comment applies to `clearAndPut`:
   
   ```scala
     def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
       queue.forEach(_.preempt(processor))
       queue.clear()
       put(event)
     }
   ```
   I think there is a bug here where at most one event will be process twice. 
Once by `_preempt(processor)` and once by `doWork`. I think we can fix this 
concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g.
   
   ```scala
     def clearAndPut(event: ControllerEvent): QueuedEvent = {
       val events = ...;
   
       inLock(putLock) {
         queue.drainTo(events)
         put(event)
       }
   
       event.forEach(_.preempt(processor))
     }
   ```

##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+    servers = makeServers(1, enableControlledShutdown = false)
+    val controller = getController().kafkaController
+    val count = new AtomicInteger(2)
+    val latch = new CountDownLatch(1)
+    val spyThread = spy(controller.eventManager.thread)
+    controller.eventManager.setControllerEventThread(spyThread)
+    val processedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = latch.await()
+      override def preempt(): Unit = {}
+    }
+    val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = {}
+      override def preempt(): Unit = count.decrementAndGet()

Review comment:
       This method should be executed by the thread that is running this test. 
If you agree, no need to use an `AtomicInteger`.

##########
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+    servers = makeServers(1, enableControlledShutdown = false)
+    val controller = getController().kafkaController
+    val count = new AtomicInteger(2)
+    val latch = new CountDownLatch(1)
+    val spyThread = spy(controller.eventManager.thread)
+    controller.eventManager.setControllerEventThread(spyThread)
+    val processedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = latch.await()
+      override def preempt(): Unit = {}
+    }
+    val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = {}
+      override def preempt(): Unit = count.decrementAndGet()
+    }
+
+    controller.eventManager.put(processedEvent)
+    controller.eventManager.put(preemptedEvent)
+    controller.eventManager.put(preemptedEvent)
+
+    doAnswer((_: InvocationOnMock) => {
+      latch.countDown()
+    }).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
       The important part is that we want to do `latch.countDown` after 
`initiateShutdown` has been called so that the controller thread doesn't pick 
up a new event because `isRunning` is `false`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to