dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r838187998



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
-        val partitionStates = removeFetcherForPartitions(thread.partitions)
+        val partitionStates = thread.removeAllPartitions()
+        failedPartitions.removeAll(partitionStates.keySet)

Review comment:
       Removing `failedPartitions.removeAll(partitionStates.keySet)` seems fine 
as we remove them anyway in `addPartitions`. Would it be possible to extend the 
test to include a few failed partitions and verify that they are indeed removed 
when the resizing is completed? We might be able to use `addFailedPartition`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to