dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714547943
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { + partitionMapLock.lockInterruptibly() + try { + partitions.foreach { tp => + val currentState = partitionStates.stateValue(tp) + if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: Right. `updateAndMoveToEnd` removes the item first and then adds it back at the end of the `LinkedHashMap`. We do rely on this to rotate the partitions in the `LinkedHashMap` in order to treat all partitions fairly. When we use a session, this is done on the broker as well btw. The new `update` method will only update the state and will overwrite any existing state in the `LinkedHashMap`. This is what `map.put()` does. However, it does not change the order in the `LinkedHashMap` which is good. We don't have to change the oder when we set the topic id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org