jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660061129



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -231,20 +239,31 @@ class FetchSession(val id: Int,
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
   def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new 
CachedPartition(topicPartition))).map(_.fetchOffset)
+    Option(partitionMap.find(new CachedPartition(topicPartition,
+      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
   }
 
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
              toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
+    val inconsistentTopicIds = new TL
     fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+      // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
+      // If the topic already existed, check that its ID is consistent.
+      val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
+      val newCachedPart = new CachedPartition(topicPart, id, reqData)
+      if (id != Uuid.ZERO_UUID) {
+        val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id)

Review comment:
       If a topic ID changes, the FetchSession will become a FetchErrorSession 
and close. I can change to putIfAbsent if it makes things clearer, but all this 
state will go away upon an error + session close.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -193,18 +197,22 @@ class CachedPartition(val topic: String,
   * Each fetch session is protected by its own lock, which must be taken 
before mutable
   * fields are read or modified.  This includes modification of the session 
partition map.
   *
-  * @param id           The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated 
by followers
-  *                     are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only 
be updated by
-  *                     FetchSessionCache#touch.
-  * @param epoch        The fetch session sequence number.
+  * @param id                 The unique fetch session ID.
+  * @param privileged         True if this session is privileged.  Sessions 
crated by followers
+  *                           are privileged; session created by consumers are 
not.
+  * @param partitionMap       The CachedPartitionMap.
+  * @param usesTopicIds       True if this session is using topic IDs
+  * @param sessionTopicIds    The mapping from topic name to topic ID for 
topics in the session.
+  * @param creationMs         The time in milliseconds when this session was 
created.
+  * @param lastUsedMs         The last used time in milliseconds.  This should 
only be updated by
+  *                           FetchSessionCache#touch.
+  * @param epoch              The fetch session sequence number.
   */
 class FetchSession(val id: Int,
                    val privileged: Boolean,
                    val partitionMap: FetchSession.CACHE_MAP,
+                   val usesTopicIds: Boolean,
+                   val sessionTopicIds: FetchSession.TOPIC_ID_MAP,

Review comment:
       I suppose it won't hurt :)

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
+  * @param version            The version of the request,
   * @param fetchData          The partition data from the fetch request.
+  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
+                       private val version: Short,

Review comment:
       We can do that to make things clearer.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
+  * @param version            The version of the request,
   * @param fetchData          The partition data from the fetch request.
+  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
+                       private val version: Short,
                        private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-    def createNewSession: FetchSession.CACHE_MAP = {
+    var error = Errors.NONE
+    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
+        if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+          error = Errors.INCONSISTENT_TOPIC_ID
         val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+        if (id != Uuid.ZERO_UUID)
+          sessionTopicIds.put(part.topic, id)
       }
-      cachedPartitions
+      (cachedPartitions, sessionTopicIds)
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-        updates.size, () => createNewSession)
+        updates.size, version, () => createNewSession)
     debug(s"Full fetch context with session id $responseSessionId returning " +
       s"${partitionsToLogString(updates.keySet)}")
-    FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+    FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
       I think this goes back to the question of whether it is useful for us to 
have information on the specific partition that failed. If we do this, should 
we also return the error values for the other fields as we do in 
FetchRequest.getErrorResponse?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var error = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          if (partitionIter.next().getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code())
+            error = Errors.INCONSISTENT_TOPIC_ID

Review comment:
       I'm still not sure I follow "pending fetch request could still reference 
the outdated Partition object and therefore miss the topicId change" My 
understanding is that the log is the source of truth and we will either read 
from the log if it matches and not read if it doesn't. I see we could get an 
error erroneously if the partition didn't update in time, but I don't see us 
being able to read from the log due to a stale partition.
   
   Or are you referring to the getPartitionOrException(tp) call picking up a 
stale partition and both the request and the partition are stale? In this case, 
we will read from the log, but will identify it with its correct ID. The client 
will handle based on this.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
+  * @param version            The version of the request,
   * @param fetchData          The partition data from the fetch request.
+  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
+                       private val version: Short,
                        private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-    def createNewSession: FetchSession.CACHE_MAP = {
+    var error = Errors.NONE
+    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
+        if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+          error = Errors.INCONSISTENT_TOPIC_ID
         val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+        if (id != Uuid.ZERO_UUID)
+          sessionTopicIds.put(part.topic, id)
       }
-      cachedPartitions
+      (cachedPartitions, sessionTopicIds)
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-        updates.size, () => createNewSession)
+        updates.size, version, () => createNewSession)
     debug(s"Full fetch context with session id $responseSessionId returning " +
       s"${partitionsToLogString(updates.keySet)}")
-    FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+    FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
       I guess the only issue with using FetchRequest.getErrorResponse is that 
we may have different topics in the response than in the request. 

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,39 +375,50 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
+  * @param version            The version of the request,
   * @param fetchData          The partition data from the fetch request.
+  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
+                       private val version: Short,
                        private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
   override def getFetchOffset(part: TopicPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
-    def createNewSession: FetchSession.CACHE_MAP = {
+    var error = Errors.NONE
+    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
+        if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code())
+          error = Errors.INCONSISTENT_TOPIC_ID
         val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
+        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
+        if (id != Uuid.ZERO_UUID)
+          sessionTopicIds.put(part.topic, id)
       }
-      cachedPartitions
+      (cachedPartitions, sessionTopicIds)
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
-        updates.size, () => createNewSession)
+        updates.size, version, () => createNewSession)
     debug(s"Full fetch context with session id $responseSessionId returning " +
       s"${partitionsToLogString(updates.keySet)}")
-    FetchResponse.of(Errors.NONE, 0, responseSessionId, updates)
+    FetchResponse.of(error, 0, responseSessionId, updates, topicIds)

Review comment:
       I guess the only issue with using FetchRequest.getErrorResponse is that 
we may have different topics in the response than in the request. 
SessionErrorContext deals with this by simply having an empty response besides 
the top level error. I'm wondering if we should do something like this. 
(Likewise, with the UNKNOWN_TOPIC_ID error, should we also just send back an 
empty response?)

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -193,18 +197,22 @@ class CachedPartition(val topic: String,
   * Each fetch session is protected by its own lock, which must be taken 
before mutable
   * fields are read or modified.  This includes modification of the session 
partition map.
   *
-  * @param id           The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated 
by followers
-  *                     are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only 
be updated by
-  *                     FetchSessionCache#touch.
-  * @param epoch        The fetch session sequence number.
+  * @param id                 The unique fetch session ID.
+  * @param privileged         True if this session is privileged.  Sessions 
crated by followers
+  *                           are privileged; session created by consumers are 
not.
+  * @param partitionMap       The CachedPartitionMap.
+  * @param usesTopicIds       True if this session is using topic IDs
+  * @param sessionTopicIds    The mapping from topic name to topic ID for 
topics in the session.
+  * @param creationMs         The time in milliseconds when this session was 
created.
+  * @param lastUsedMs         The last used time in milliseconds.  This should 
only be updated by
+  *                           FetchSessionCache#touch.
+  * @param epoch              The fetch session sequence number.
   */
 class FetchSession(val id: Int,
                    val privileged: Boolean,
                    val partitionMap: FetchSession.CACHE_MAP,
+                   val usesTopicIds: Boolean,
+                   val sessionTopicIds: FetchSession.TOPIC_ID_MAP,

Review comment:
       Taking a second look, seems like we just use partitionMap.size. Not sure 
if it is useful to have sessionTopicIds size (and if the whole map is too 
much). I'm thinking maybe just including the usesTopicIds boolean.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -277,14 +277,18 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
         // is essential for them.
         Errors error = Errors.forException(e);
         List<FetchResponseData.FetchableTopicResponse> topicResponseList = new 
ArrayList<>();
-        data.topics().forEach(topic -> {
-            List<FetchResponseData.PartitionData> partitionResponses = 
topic.partitions().stream().map(partition ->
-                    FetchResponse.partitionResponse(partition.partition(), 
error)).collect(Collectors.toList());
-            topicResponseList.add(new 
FetchResponseData.FetchableTopicResponse()
-                    .setTopic(topic.topic())
-                    .setTopicId(topic.topicId())
-                    .setPartitions(partitionResponses));
-        });
+        // Since UNKNOWN_TOPIC_ID is a new error type only returned when topic 
ID requests are made (from newer clients),

Review comment:
       We need to do something like this to easily get the top level error with 
no partition response for UNKNOWN_TOPIC_ID. I think this works, but we may want 
a version check as well just to be safe.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -296,11 +276,24 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
         // may not be any partitions at all in the response.  For this reason, 
the top-level error code
         // is essential for them.
         Errors error = Errors.forException(e);
-        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
-        for (Map.Entry<TopicPartition, PartitionData> entry : 
fetchData.entrySet()) {
-            responseData.put(entry.getKey(), 
FetchResponse.partitionResponse(entry.getKey().partition(), error));
+        List<FetchResponseData.FetchableTopicResponse> topicResponseList = new 
ArrayList<>();
+        // Since UNKNOWN_TOPIC_ID is a new error type only returned when topic 
ID requests are made (from newer clients),
+        // we can skip returning the error on all partitions and returning any 
partitions at all.
+        if (error != Errors.UNKNOWN_TOPIC_ID) {

Review comment:
       Yeah. I agree it is a bit weird. We can update as you mentioned.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +512,26 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var topLevelError = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          val entry = partitionIter.next()
+          if (entry.getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code()) {

Review comment:
       The topic ID should not change in the log once it is set. I think what 
you said in the last sentence is correct. My understanding is that if the log 
is closed, it can not read from it anymore. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to