jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r660068452



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var error = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          if (partitionIter.next().getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code())
+            error = Errors.INCONSISTENT_TOPIC_ID

Review comment:
       I'm still not sure I follow "pending fetch request could still reference 
the outdated Partition object and therefore miss the topicId change" My 
understanding is that the log is the source of truth and we will either read 
from the log if it matches and not read if it doesn't. I see we could get an 
error erroneously if the partition didn't update in time, but I don't see us 
being able to read from the log due to a stale partition.
   
   Or are you referring to the getPartitionOrException(tp) call picking up a 
stale partition and both the request and the partition are stale? In this case, 
we will read from the log, but will identify it with its correct ID. The client 
will handle based on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to