dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714547943



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)
+        if (currentState != null) {
+          val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+          partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
       Right. `updateAndMoveToEnd` removes the item first and then adds it back 
at the end of the `LinkedHashMap`. We do rely on this to rotate the partitions 
in the `LinkedHashMap` in order to treat all partitions fairly. When we use a 
session, this is done on the broker as well btw.
   
   The new `update` method will only update the state and will overwrite any 
existing state in the `LinkedHashMap`. This is what `map.put()` does. However, 
it does not change the order in the `LinkedHashMap` which is good. We don't 
have to change the oder when we set the topic id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to