jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660061129



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -231,20 +239,31 @@ class FetchSession(val id: Int,
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
   def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new 
CachedPartition(topicPartition))).map(_.fetchOffset)
+    Option(partitionMap.find(new CachedPartition(topicPartition,
+      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
   }
 
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
              toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
+    val inconsistentTopicIds = new TL
     fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+      // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
+      // If the topic already existed, check that its ID is consistent.
+      val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
+      val newCachedPart = new CachedPartition(topicPart, id, reqData)
+      if (id != Uuid.ZERO_UUID) {
+        val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id)

Review comment:
       If a topic ID changes, the FetchSession will become a FetchErrorSession 
and close. I can change to putIfAbsent if it makes things clearer, but all this 
state will go away upon an error + session close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to