dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r837137526
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = {
+ val allRemovedPartitionsMap = mutable.Map[TopicPartition,
InitialFetchState]()
fetcherThreadMap.forKeyValue { (id, thread) =>
val partitionStates = removeFetcherForPartitions(thread.partitions)
if (id.fetcherId >= newSize)
thread.shutdown()
- val fetchStates = partitionStates.map { case (topicPartition,
currentFetchState) =>
- val initialFetchState = InitialFetchState(currentFetchState.topicId,
thread.sourceBroker,
- currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
- initOffset = currentFetchState.fetchOffset)
- topicPartition -> initialFetchState
+ partitionStates.forKeyValue {
+ (topicPartition, currentFetchState) =>
Review comment:
nit: We usually keep `(topicPartition, currentFetchState) =>` on the
previous line.
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
Review comment:
nit: Could we break this long line?
```
def resizeAndCheckFetcherPartitionDistribution(
fetcherManager: AbstractFetcherManager[AbstractFetcherThread],
topicPartitions: Set[TopicPartition],
fetcherNum: Int
): Unit = {
```
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+ fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+ val brokerId = Utils.abs(tp.topic().hashCode % 5)
+ val brokerEndPoint = new BrokerEndPoint(brokerId,
s"kafka-host-$brokerId", 9092)
+ tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+ }).toMap)
+ fetcherManager.resizeThreadPool(fetcherNum)
+ val ownedPartitions = mutable.Set.empty[TopicPartition]
+ fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId,
fetcherThread) =>
+ val fetcherId = brokerIdAndFetcherId.fetcherId
+ fetcherThread.partitions.foreach { tp =>
+ ownedPartitions += tp
+ assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+ }
+ }
+ // Verify that all partitions are owned by the fetcher threads.
+ assertEquals(topicPartitions, ownedPartitions)
+ }
+
+ def mockTopicPartition(topicNum: Int, partitionNum: Int):
Set[TopicPartition] = {
Review comment:
nit: Could we make this method private? How about calling it
`makeTopicPartitions`? The `TopicPartition` are not mocked so using `mock`
looks weird here.
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+ fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
Review comment:
nit: `topicPartitions.map(tp => {` -> `topicPartitions.map { tp => `
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -61,7 +61,9 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
private[server] def deadThreadCount: Int = lock synchronized {
fetcherThreadMap.values.count(_.isThreadFailed) }
def resizeThreadPool(newSize: Int): Unit = {
+ // Used to replace migratePartitions and solve the fetch problem
def migratePartitions(newSize: Int): Unit = {
+ val allRemovedPartitionsMap = mutable.Map[TopicPartition,
InitialFetchState]()
fetcherThreadMap.forKeyValue { (id, thread) =>
val partitionStates = removeFetcherForPartitions(thread.partitions)
Review comment:
@yufeiyan1220 Are you interested in doing this in your PR? We can do as
a follow-up otherwise.
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+ fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+ val brokerId = Utils.abs(tp.topic().hashCode % 5)
Review comment:
nit: You can omit the parenthesis after `topic`. We usually don't put
them for getters.
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+ fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+ val brokerId = Utils.abs(tp.topic().hashCode % 5)
+ val brokerEndPoint = new BrokerEndPoint(brokerId,
s"kafka-host-$brokerId", 9092)
Review comment:
Does using different broker ids bring any value to this test? If it
does, it might be good to also verify the broker id when we verify the fetch id.
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
Review comment:
`testShrinkThreadPool` is almost identical to `testExpandThreadPool`.
Would it make sense to define `testResizeThreadPool(currentSize, newSize)` and
use it in both?
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+ fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+ val brokerId = Utils.abs(tp.topic().hashCode % 5)
+ val brokerEndPoint = new BrokerEndPoint(brokerId,
s"kafka-host-$brokerId", 9092)
+ tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+ }).toMap)
+ fetcherManager.resizeThreadPool(fetcherNum)
+ val ownedPartitions = mutable.Set.empty[TopicPartition]
+ fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId,
fetcherThread) =>
+ val fetcherId = brokerIdAndFetcherId.fetcherId
+ fetcherThread.partitions.foreach { tp =>
+ ownedPartitions += tp
+ assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+ }
+ }
+ // Verify that all partitions are owned by the fetcher threads.
+ assertEquals(topicPartitions, ownedPartitions)
+ }
+
+ def mockTopicPartition(topicNum: Int, partitionNum: Int):
Set[TopicPartition] = {
+ val res = mutable.Set[TopicPartition]()
+ val topicPrefix = "topic_"
+ for (i <- 0 to topicNum) {
+ val topic = topicPrefix + i
+ for (j <- 0 to partitionNum) {
+ res += new TopicPartition(topic, j)
+ }
+ }
+ res.toSet
+ }
+
+ class TestResizeFetcherThread(sourceBroker: BrokerEndPoint)
+ extends AbstractFetcherThread("test-resize-fetcher",
+ clientId = "mock-fetcher",
+ sourceBroker,
+ new FailedPartitions,
+ fetchBackOffMs = 0,
+ brokerTopicStats = new BrokerTopicStats) {
+ override protected def processPartitionData(topicPartition:
TopicPartition, fetchOffset: Long, partitionData: FetchData):
Option[LogAppendInfo] = {
Review comment:
nit: Could we add an empty line before this method?
##########
File path:
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
}
+
+ @Test
+ def testExpandThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 10) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 25)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ @Test
+ def testShrinkThreadPool(): Unit = {
+ val topicPartitions = mockTopicPartition(10, 100)
+ val fetcherManager = new
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager",
"fetcher-manager", 20) {
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new TestResizeFetcherThread(sourceBroker)
+ }
+ }
+ try {
+ resizeAndCheckFetcherPartitionDistribution(fetcherManager,
topicPartitions, 5)
+ } finally {
+ fetcherManager.closeAllFetchers()
+ }
+ }
+
+ def resizeAndCheckFetcherPartitionDistribution(fetcherManager:
AbstractFetcherManager[AbstractFetcherThread], topicPartitions:
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+ fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+ val brokerId = Utils.abs(tp.topic().hashCode % 5)
+ val brokerEndPoint = new BrokerEndPoint(brokerId,
s"kafka-host-$brokerId", 9092)
+ tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+ }).toMap)
+ fetcherManager.resizeThreadPool(fetcherNum)
+ val ownedPartitions = mutable.Set.empty[TopicPartition]
+ fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId,
fetcherThread) =>
+ val fetcherId = brokerIdAndFetcherId.fetcherId
+ fetcherThread.partitions.foreach { tp =>
+ ownedPartitions += tp
+ assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+ }
+ }
+ // Verify that all partitions are owned by the fetcher threads.
+ assertEquals(topicPartitions, ownedPartitions)
+ }
+
+ def mockTopicPartition(topicNum: Int, partitionNum: Int):
Set[TopicPartition] = {
+ val res = mutable.Set[TopicPartition]()
+ val topicPrefix = "topic_"
+ for (i <- 0 to topicNum) {
+ val topic = topicPrefix + i
+ for (j <- 0 to partitionNum) {
+ res += new TopicPartition(topic, j)
+ }
+ }
+ res.toSet
+ }
+
+ class TestResizeFetcherThread(sourceBroker: BrokerEndPoint)
+ extends AbstractFetcherThread("test-resize-fetcher",
+ clientId = "mock-fetcher",
+ sourceBroker,
+ new FailedPartitions,
+ fetchBackOffMs = 0,
+ brokerTopicStats = new BrokerTopicStats) {
Review comment:
* nit: Could we also name the parameter for `test-resize-fetcher`?
* nit: Could we align the parameters as well? We can either align all of
them on the first one or move the first one on a new line.
##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -20,13 +20,11 @@ package kafka.server
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.atomic.AtomicInteger
-
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.message.NoCompressionCodec
import kafka.metrics.KafkaYammerMetrics
-import kafka.server.AbstractFetcherThread.ReplicaFetch
-import kafka.server.AbstractFetcherThread.ResultWithPartitions
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
Review comment:
Could we revert changes in this file as there are not necessary for the
patch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]