yufeiyan1220 commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r839161947
##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = {
+ val allRemovedPartitionsMap = mutable.Map[TopicPartition,
InitialFetchState]()
fetcherThreadMap.forKeyValue { (id, thread) =>
- val partitionStates = removeFetcherForPartitions(thread.partitions)
+ val partitionStates = thread.removeAllPartitions()
Review comment:
I think unless there is a null as value in
`partitionStates.partitionStateMap`, there is no NPE thrown. Method
`partitionMapLock` make sure there is no other thread changing
`partitionStates` in this process. The original version need to filter the null
value because `partitionStates.stateValue` might return null when the partition
is not included in `partitionStates.partitionStateMap`.
I am not sure, may be I should add the filter logic as well or just make it
by feeding all partitions to `removePartitions `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]