jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660062652



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
+  * @param version            The version of the request,
   * @param fetchData          The partition data from the fetch request.
+  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
+                       private val version: Short,
                        private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-    def createNewSession: FetchSession.CACHE_MAP = {
+    var error = Errors.NONE
+    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
+        if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+          error = Errors.INCONSISTENT_TOPIC_ID
         val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+        if (id != Uuid.ZERO_UUID)
+          sessionTopicIds.put(part.topic, id)
       }
-      cachedPartitions
+      (cachedPartitions, sessionTopicIds)
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-        updates.size, () => createNewSession)
+        updates.size, version, () => createNewSession)
     debug(s"Full fetch context with session id $responseSessionId returning " +
       s"${partitionsToLogString(updates.keySet)}")
-    FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+    FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
       I think this goes back to the question of whether it is useful for us to 
have information on the specific partition that failed. If we do this, should 
we also return the error values for the other fields as we do in 
FetchRequest.getErrorResponse?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to