yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r837340707



##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+      val brokerId = Utils.abs(tp.topic().hashCode % 5)
+      val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)

Review comment:
       I think each topic partition should be bound with a `BrokerEndPoint` 
which should its leader. Without leader changed, each topic partition should 
remain its `BrokerEndPoint` after resizing.  It make sense. 
   
   It seems always satisfied in `addFetcherForPartitions`, but I will add it 
here in case of `addFetcherForPartitions` changed in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to