yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r836585455



##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {
+
+    val topicPartition = mockTopicPartition(10, 100)
+    val threadManager: TestThreadResizeManager = new TestThreadResizeManager()
+    try {
+      threadManager.addFetcherForPartitions(topicPartition.map(_ -> 
InitialFetchState(None, new BrokerEndPoint(0, "localhost", 9092), 0, 1)).toMap)
+
+      threadManager.resizeThreadPool(60)
+      val threadPartitionCount = threadManager.fetcherThreadMap.map {case 
(brokerFetcher, thread) => s"[${brokerFetcher.fetcherId}]${thread.getName}" -> 
thread.partitions.size}
+      val tpsWithoutResize = mutable.Map[Int, Set[TopicPartition]]()
+      threadManager.fetcherThreadMap.foreach {
+        case (brokerFetcher, thread) => {
+          val fetcherId = brokerFetcher.fetcherId
+          val tpSet = mutable.Set[TopicPartition]()
+          thread.partitions.foreach(tp => {
+            val id = threadManager.getFetcherId(tp)
+            if (!id.equals(fetcherId)) {
+              print(s"tp: $tp with fetcherId $fetcherId which should be $id, 
error when resizing threads.\n")
+              tpSet += tp
+            }
+          })
+          if (!tpSet.isEmpty)
+            tpsWithoutResize += fetcherId -> tpSet
+        }
+      }
+      print(s"Current fetcher assigned partition count Map: 
$threadPartitionCount\nFetcher partition without resize: $tpsWithoutResize")
+      // All partitions should be redistributed to fetchers with new thread 
number
+      assertEquals(0, tpsWithoutResize.size)
+    } finally {
+      threadManager.closeAllFetchers()
+    }
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val dict = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      for (j <- 0 to partitionNum) {
+        val topic = topicPrefix + dict(AdminUtils.rand.nextInt(i + 1))
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  class TestThreadResizeManager extends 
AbstractFetcherManager[TestResizeFetcherThread]("TestThreadResizeManager", 
"test-resize-thread", 10) {
+    override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): TestResizeFetcherThread = {
+      new TestResizeFetcherThread()
+    }
+  }
+
+  class TestResizeFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1, 
fetchBackOffMs: Int = 0)

Review comment:
       > We use `MockFetcherThread` in other tests. Could use it here as well 
instead of re-defining another one?
   
   I find a problem when I try to use MockFetcherThread instead of my 
new-defined one, however, it throws "java.lang.IllegalArgumentException: 
Unknown partition" because  MockFetcherThread is designed for testing replica 
epoch and other features.
   I think I should put all the test in AbstractFetcherManagerTest without 
testing other features. Actually, I use old "DummyFetcherThread"  to test in 
old version....




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to