showuon commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839140933



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = {
+    partitionMapLock.lockInterruptibly()

Review comment:
       Maybe we can just feed all partitions into `removePartitions` method? 

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
-        val partitionStates = removeFetcherForPartitions(thread.partitions)
+        val partitionStates = thread.removeAllPartitions()

Review comment:
       Originally, we call `removeFetcherForPartitions`, where we will filter 
out the `InitialFetchState == null` partitions 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L742).
 I'm not sure if this is something we should care about, but I think we'd 
better to keep the filter logic to avoid the NPE thrown.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +218,115 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: 
Int, brokerNum: Int = 6): Unit = {
+    val fetchingTopicPartitions = makeTopicPartition(10, 100)
+    val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherSize) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker, failedPartitions)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp 
=>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      // Mark some of these partitions failed within resizing scope
+      
fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+      // Mark failed partitions out of resizing scope
+      failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+      fetcherManager.resizeThreadPool(newFetcherSize)
+
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+      val failedPartitionsAfterResize = 
fetcherManager.failedPartitions.failedPartitions()
+      // Verify that failed partitions within resizing scope are removed, 
otherwise retained

Review comment:
       Could you explain what does `otherwise retained` mean? I can understand 
the failed partitions within resizing scope should be removed since we've 
verified it, but don't know the latter one. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to