yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r836342442



##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {
+
+    val topicPartition = mockTopicPartition(10, 100)
+    val threadManager: TestThreadResizeManager = new TestThreadResizeManager()
+    try {
+      threadManager.addFetcherForPartitions(topicPartition.map(_ -> 
InitialFetchState(None, new BrokerEndPoint(0, "localhost", 9092), 0, 1)).toMap)
+
+      threadManager.resizeThreadPool(60)
+      val threadPartitionCount = threadManager.fetcherThreadMap.map {case 
(brokerFetcher, thread) => s"[${brokerFetcher.fetcherId}]${thread.getName}" -> 
thread.partitions.size}
+      val tpsWithoutResize = mutable.Map[Int, Set[TopicPartition]]()
+      threadManager.fetcherThreadMap.foreach {
+        case (brokerFetcher, thread) => {
+          val fetcherId = brokerFetcher.fetcherId
+          val tpSet = mutable.Set[TopicPartition]()
+          thread.partitions.foreach(tp => {
+            val id = threadManager.getFetcherId(tp)
+            if (!id.equals(fetcherId)) {
+              print(s"tp: $tp with fetcherId $fetcherId which should be $id, 
error when resizing threads.\n")
+              tpSet += tp
+            }
+          })
+          if (!tpSet.isEmpty)
+            tpsWithoutResize += fetcherId -> tpSet
+        }
+      }
+      print(s"Current fetcher assigned partition count Map: 
$threadPartitionCount\nFetcher partition without resize: $tpsWithoutResize")
+      // All partitions should be redistributed to fetchers with new thread 
number
+      assertEquals(0, tpsWithoutResize.size)
+    } finally {
+      threadManager.closeAllFetchers()
+    }
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val dict = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      for (j <- 0 to partitionNum) {
+        val topic = topicPrefix + dict(AdminUtils.rand.nextInt(i + 1))
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  class TestThreadResizeManager extends 
AbstractFetcherManager[TestResizeFetcherThread]("TestThreadResizeManager", 
"test-resize-thread", 10) {
+    override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): TestResizeFetcherThread = {
+      new TestResizeFetcherThread()
+    }
+  }
+
+  class TestResizeFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1, 
fetchBackOffMs: Int = 0)

Review comment:
       > @yufeiyan1220 Great find! I am really surprised to see that we don't 
have any tests for this logic. The suggested approach seems reasonable to me. I 
left a few suggestions.
   
   Thank you for your comment! I'll refactor my code and push it later!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to