yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r836585455
##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
}
+ @Test
+ def testResize(): Unit = {
+
+ val topicPartition = mockTopicPartition(10, 100)
+ val threadManager: TestThreadResizeManager = new TestThreadResizeManager()
+ try {
+ threadManager.addFetcherForPartitions(topicPartition.map(_ ->
InitialFetchState(None, new BrokerEndPoint(0, "localhost", 9092), 0, 1)).toMap)
+
+ threadManager.resizeThreadPool(60)
+ val threadPartitionCount = threadManager.fetcherThreadMap.map {case
(brokerFetcher, thread) => s"[${brokerFetcher.fetcherId}]${thread.getName}" ->
thread.partitions.size}
+ val tpsWithoutResize = mutable.Map[Int, Set[TopicPartition]]()
+ threadManager.fetcherThreadMap.foreach {
+ case (brokerFetcher, thread) => {
+ val fetcherId = brokerFetcher.fetcherId
+ val tpSet = mutable.Set[TopicPartition]()
+ thread.partitions.foreach(tp => {
+ val id = threadManager.getFetcherId(tp)
+ if (!id.equals(fetcherId)) {
+ print(s"tp: $tp with fetcherId $fetcherId which should be $id,
error when resizing threads.\n")
+ tpSet += tp
+ }
+ })
+ if (!tpSet.isEmpty)
+ tpsWithoutResize += fetcherId -> tpSet
+ }
+ }
+ print(s"Current fetcher assigned partition count Map:
$threadPartitionCount\nFetcher partition without resize: $tpsWithoutResize")
+ // All partitions should be redistributed to fetchers with new thread
number
+ assertEquals(0, tpsWithoutResize.size)
+ } finally {
+ threadManager.closeAllFetchers()
+ }
+ }
+
+ def mockTopicPartition(topicNum: Int, partitionNum: Int):
Set[TopicPartition] = {
+ val res = mutable.Set[TopicPartition]()
+ val dict = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+ val topicPrefix = "topic_"
+ for (i <- 0 to topicNum) {
+ for (j <- 0 to partitionNum) {
+ val topic = topicPrefix + dict(AdminUtils.rand.nextInt(i + 1))
+ res += new TopicPartition(topic, j)
+ }
+ }
+ res.toSet
+ }
+
+ class TestThreadResizeManager extends
AbstractFetcherManager[TestResizeFetcherThread]("TestThreadResizeManager",
"test-resize-thread", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): TestResizeFetcherThread = {
+ new TestResizeFetcherThread()
+ }
+ }
+
+ class TestResizeFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1,
fetchBackOffMs: Int = 0)
Review comment:
> We use `MockFetcherThread` in other tests. Could use it here as well
instead of re-defining another one?
I find a problem when I try to use MockFetcherThread instead of my
new-defined one, however, it throws "java.lang.IllegalArgumentException:
Unknown partition" because MockFetcherThread is designed for testing replica
epoch and other features.
I think I should put all the test in AbstractFetcherManagerTest without
testing other features. Actually, I use old "DummyFetcherThread" to test in
old version....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]