dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r837407430



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
-        val partitionStates = removeFetcherForPartitions(thread.partitions)
+        val partitionStates = thread.removeAllPartitions()
+        failedPartitions.removeAll(partitionStates.keySet)

Review comment:
       As we end up removing all partitions from all the fetchers, it seems to 
me that we could just clear the whole set of failed partitions right before 
calling `addFetcherForPartitions(allRemovedPartitionsMap)`. We would need to 
add a `clear` method to `FailedPartitions`. Is my reasoning correct?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      val allPartitionState = partitionStates.partitionStateMap().asScala.toMap

Review comment:
       nit: You can omit the parenthesis of `partitionStateMap`.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50, 6)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10, 6)
+  }
+
+  def testResizeThreadPool(currentFetcherNum: Int, newFetcherNum: Int, 
brokerNum: Int): Unit = {

Review comment:
       nit: `currentFetcherSize` and `newFetcherSize` would be better names. 
`brokerNum` is constant, should we define it inline?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50, 6)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10, 6)
+  }
+
+  def testResizeThreadPool(currentFetcherNum: Int, newFetcherNum: Int, 
brokerNum: Int): Unit = {
+    val topicPartitions = makeTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherNum) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(topicPartitions.map { tp =>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      fetcherManager.resizeThreadPool(newFetcherNum)
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(topicPartitions, ownedPartitions)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+
+  def makeTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      val topic = topicPrefix + i
+      for (j <- 0 to partitionNum) {
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  def getBrokerId(tp: TopicPartition, brokerNum: Int): Int = {
+    Utils.abs(31 * tp.topic.hashCode() + tp.partition) % brokerNum

Review comment:
       nit: Could we use `tp.hashCode`?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50, 6)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10, 6)
+  }
+
+  def testResizeThreadPool(currentFetcherNum: Int, newFetcherNum: Int, 
brokerNum: Int): Unit = {
+    val topicPartitions = makeTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherNum) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(topicPartitions.map { tp =>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      fetcherManager.resizeThreadPool(newFetcherNum)
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(topicPartitions, ownedPartitions)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+
+  def makeTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      val topic = topicPrefix + i
+      for (j <- 0 to partitionNum) {
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  def getBrokerId(tp: TopicPartition, brokerNum: Int): Int = {

Review comment:
       nit: Could we make this method private?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50, 6)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10, 6)
+  }
+
+  def testResizeThreadPool(currentFetcherNum: Int, newFetcherNum: Int, 
brokerNum: Int): Unit = {
+    val topicPartitions = makeTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherNum) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(topicPartitions.map { tp =>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      fetcherManager.resizeThreadPool(newFetcherNum)
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(topicPartitions, ownedPartitions)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+
+  def makeTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {

Review comment:
       nit: Could we make this method private?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50, 6)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10, 6)
+  }
+
+  def testResizeThreadPool(currentFetcherNum: Int, newFetcherNum: Int, 
brokerNum: Int): Unit = {
+    val topicPartitions = makeTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", currentFetcherNum) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(topicPartitions.map { tp =>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      fetcherManager.resizeThreadPool(newFetcherNum)
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(topicPartitions, ownedPartitions)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+
+  def makeTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      val topic = topicPrefix + i
+      for (j <- 0 to partitionNum) {
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  def getBrokerId(tp: TopicPartition, brokerNum: Int): Int = {
+    Utils.abs(31 * tp.topic.hashCode() + tp.partition) % brokerNum
+  }
+
+  class TestResizeFetcherThread(sourceBroker: BrokerEndPoint)

Review comment:
       nit: Could we make this class private?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to