jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r655647688



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var error = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          if (partitionIter.next().getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code())
+            error = Errors.INCONSISTENT_TOPIC_ID

Review comment:
       @lbradstreet and I discussed this a bit. It seems that the metadata 
cache may be less accurate than the log itself and that is why we did away with 
the metadata check. I am also a little unsure (I'd have to check the code) but 
I'm not sure if the topicId can change. Are we saying that the partition and/or 
the underlying log can change in this code block? I think we can say we will 
read from the partition with that ID.
   
   ```
   val partition = getPartitionOrException(tp)
   val fetchTimeMs = time.milliseconds
   
   // Check if topic ID from the fetch request/session matches the ID in the log
   if (!hasConsistentTopicId(topicIdFromSession(partition.topic), 
partition.topicId))
     throw new InconsistentTopicIdException("Topic ID in the fetch session did 
not match the topic ID in the log.")
     . 
     .
     .
     val readInfo: LogReadInfo = partition.readRecords(
               lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
               fetchOffset = fetchInfo.fetchOffset,
               currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
               maxBytes = adjustedMaxBytes,
               fetchIsolation = fetchIsolation,
               fetchOnlyFromLeader = fetchOnlyFromLeader,
               minOneMessage = minOneMessage)
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to