yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r837644305



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
-        val partitionStates = removeFetcherForPartitions(thread.partitions)
+        val partitionStates = thread.removeAllPartitions()
+        failedPartitions.removeAll(partitionStates.keySet)

Review comment:
       I am not very  familiar with `FailedPartitions`, and I find that 
'FailedPartitions' removed when add topic partitions, remove partitions of 
fetchers. I think maybe the union of `FailedPartitions` and 
`removedFetcherForPartitions` is the whole set of partitions. So should we just 
clear it or remain those topic partitions failed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to