dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r836155421



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -61,7 +61,9 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   private[server] def deadThreadCount: Int = lock synchronized { 
fetcherThreadMap.values.count(_.isThreadFailed) }
 
   def resizeThreadPool(newSize: Int): Unit = {
+    // Used to replace migratePartitions and solve the fetch problem

Review comment:
       nit: The comment does not seem necessary. Should we remove it?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -61,7 +61,9 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   private[server] def deadThreadCount: Int = lock synchronized { 
fetcherThreadMap.values.count(_.isThreadFailed) }
 
   def resizeThreadPool(newSize: Int): Unit = {
+    // Used to replace migratePartitions and solve the fetch problem
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
         val partitionStates = removeFetcherForPartitions(thread.partitions)

Review comment:
       While we are here. `removeFetcherForPartitions` iterates overs all the 
fetchers to remove the `thread.partitions`. That is not necessary because we 
know that partitions are from a single fetcher. It seems that we could directly 
use `thread.removePartitions(thread.partitions)` and add 
`failedPartitions.clear()` before calling 
`addFetcherForPartitions(allRemovedPartitionsMap)`. What do you think?

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -17,16 +17,16 @@
 
 package kafka.server
 
+import kafka.admin.AdminUtils

Review comment:
       nit: Could we move this one next to the other `kafka.*` imports?

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {
+
+    val topicPartition = mockTopicPartition(10, 100)
+    val threadManager: TestThreadResizeManager = new TestThreadResizeManager()
+    try {
+      threadManager.addFetcherForPartitions(topicPartition.map(_ -> 
InitialFetchState(None, new BrokerEndPoint(0, "localhost", 9092), 0, 1)).toMap)
+
+      threadManager.resizeThreadPool(60)
+      val threadPartitionCount = threadManager.fetcherThreadMap.map {case 
(brokerFetcher, thread) => s"[${brokerFetcher.fetcherId}]${thread.getName}" -> 
thread.partitions.size}
+      val tpsWithoutResize = mutable.Map[Int, Set[TopicPartition]]()
+      threadManager.fetcherThreadMap.foreach {
+        case (brokerFetcher, thread) => {
+          val fetcherId = brokerFetcher.fetcherId
+          val tpSet = mutable.Set[TopicPartition]()
+          thread.partitions.foreach(tp => {
+            val id = threadManager.getFetcherId(tp)
+            if (!id.equals(fetcherId)) {
+              print(s"tp: $tp with fetcherId $fetcherId which should be $id, 
error when resizing threads.\n")
+              tpSet += tp
+            }
+          })
+          if (!tpSet.isEmpty)
+            tpsWithoutResize += fetcherId -> tpSet
+        }
+      }
+      print(s"Current fetcher assigned partition count Map: 
$threadPartitionCount\nFetcher partition without resize: $tpsWithoutResize")
+      // All partitions should be redistributed to fetchers with new thread 
number
+      assertEquals(0, tpsWithoutResize.size)

Review comment:
       It seems that we should clean up this part. We usually don't keep print 
statements in tests. Overall, I think that we should verify two invariants here:
   1) All the partitions must be owned by a thread; and
   2) The partitions must be owned by the correct thread.
   
   I would simplify it as follow:
   ```scala
   val ownedPartitions = mutable.Set.empty[TopicPartition]
   threadManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
     val fetcherId = brokerIdAndFetcherId.fetchId
     fetcherThread.partitions.foreach { tp ->
       ownedPartitions += tp
       // Verify that the partition is owned by the expected fetcher thread.
       assertEquals(threadManager.getFetcherId(tp), fetcherId)
     }
   }
   // Verify that all partitions are owned by the fetcher threads.
   assertEquals(topicPartition, ownedPartitions)
   ```
   
   Note that I haven't tested this code.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -72,9 +74,11 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
             initOffset = currentFetchState.fetchOffset)
           topicPartition -> initialFetchState
         }
-        addFetcherForPartitions(fetchStates)
+        allRemovedPartitionsMap ++= fetchStates

Review comment:
       Building `fetchStates` is not necessary anymore. It seems that we could 
replace `.map` by `.forKeyValue` and directly add partitions to 
`allRemovedPartitionsMap`. Would this work?

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {
+
+    val topicPartition = mockTopicPartition(10, 100)
+    val threadManager: TestThreadResizeManager = new TestThreadResizeManager()
+    try {
+      threadManager.addFetcherForPartitions(topicPartition.map(_ -> 
InitialFetchState(None, new BrokerEndPoint(0, "localhost", 9092), 0, 1)).toMap)
+
+      threadManager.resizeThreadPool(60)
+      val threadPartitionCount = threadManager.fetcherThreadMap.map {case 
(brokerFetcher, thread) => s"[${brokerFetcher.fetcherId}]${thread.getName}" -> 
thread.partitions.size}
+      val tpsWithoutResize = mutable.Map[Int, Set[TopicPartition]]()
+      threadManager.fetcherThreadMap.foreach {
+        case (brokerFetcher, thread) => {
+          val fetcherId = brokerFetcher.fetcherId
+          val tpSet = mutable.Set[TopicPartition]()
+          thread.partitions.foreach(tp => {
+            val id = threadManager.getFetcherId(tp)
+            if (!id.equals(fetcherId)) {
+              print(s"tp: $tp with fetcherId $fetcherId which should be $id, 
error when resizing threads.\n")
+              tpSet += tp
+            }
+          })
+          if (!tpSet.isEmpty)
+            tpsWithoutResize += fetcherId -> tpSet
+        }
+      }
+      print(s"Current fetcher assigned partition count Map: 
$threadPartitionCount\nFetcher partition without resize: $tpsWithoutResize")
+      // All partitions should be redistributed to fetchers with new thread 
number
+      assertEquals(0, tpsWithoutResize.size)
+    } finally {
+      threadManager.closeAllFetchers()
+    }
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val dict = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      for (j <- 0 to partitionNum) {
+        val topic = topicPrefix + dict(AdminUtils.rand.nextInt(i + 1))

Review comment:
       Does using a dict bring any value? If not, I would just use `i` as a 
suffix here.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {
+
+    val topicPartition = mockTopicPartition(10, 100)
+    val threadManager: TestThreadResizeManager = new TestThreadResizeManager()
+    try {
+      threadManager.addFetcherForPartitions(topicPartition.map(_ -> 
InitialFetchState(None, new BrokerEndPoint(0, "localhost", 9092), 0, 1)).toMap)
+
+      threadManager.resizeThreadPool(60)
+      val threadPartitionCount = threadManager.fetcherThreadMap.map {case 
(brokerFetcher, thread) => s"[${brokerFetcher.fetcherId}]${thread.getName}" -> 
thread.partitions.size}
+      val tpsWithoutResize = mutable.Map[Int, Set[TopicPartition]]()
+      threadManager.fetcherThreadMap.foreach {
+        case (brokerFetcher, thread) => {
+          val fetcherId = brokerFetcher.fetcherId
+          val tpSet = mutable.Set[TopicPartition]()
+          thread.partitions.foreach(tp => {
+            val id = threadManager.getFetcherId(tp)
+            if (!id.equals(fetcherId)) {
+              print(s"tp: $tp with fetcherId $fetcherId which should be $id, 
error when resizing threads.\n")
+              tpSet += tp
+            }
+          })
+          if (!tpSet.isEmpty)
+            tpsWithoutResize += fetcherId -> tpSet
+        }
+      }
+      print(s"Current fetcher assigned partition count Map: 
$threadPartitionCount\nFetcher partition without resize: $tpsWithoutResize")
+      // All partitions should be redistributed to fetchers with new thread 
number
+      assertEquals(0, tpsWithoutResize.size)
+    } finally {
+      threadManager.closeAllFetchers()
+    }
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val dict = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      for (j <- 0 to partitionNum) {
+        val topic = topicPrefix + dict(AdminUtils.rand.nextInt(i + 1))
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  class TestThreadResizeManager extends 
AbstractFetcherManager[TestResizeFetcherThread]("TestThreadResizeManager", 
"test-resize-thread", 10) {
+    override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): TestResizeFetcherThread = {
+      new TestResizeFetcherThread()
+    }
+  }
+
+  class TestResizeFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1, 
fetchBackOffMs: Int = 0)

Review comment:
       We use `MockFetcherThread` in other tests. Could use it here as well 
instead of re-defining another one?

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {

Review comment:
       This test would be better placed in `AbstractFetcherManagerTest`. Could 
we name it `testExpandThreadPool` and also add another test 
`testShrinkThreadPool` to test the opposite?

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1333,4 +1333,93 @@ class AbstractFetcherThreadTest {
 
   }
 
+  @Test
+  def testResize(): Unit = {
+
+    val topicPartition = mockTopicPartition(10, 100)
+    val threadManager: TestThreadResizeManager = new TestThreadResizeManager()

Review comment:
       In `AbstractFetcherManagerTest`, you will see that we usually create an 
anonymous class for the `AbstractFetcherManager`. I would follow the same 
pattern here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to