jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r660068452
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time, if (session.epoch != expectedEpoch) { info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + s"got ${session.epoch}. Possible duplicate request.") - FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP) + FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP, Collections.emptyMap()) } else { + var error = Errors.NONE // Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent + // It will also set the top-level error to INCONSISTENT_TOPIC_ID if any partitions had this error. val partitionIter = new PartitionIterator(updates.entrySet.iterator, true) while (partitionIter.hasNext) { - partitionIter.next() + if (partitionIter.next().getValue.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) + error = Errors.INCONSISTENT_TOPIC_ID Review comment: I'm still not sure I follow "pending fetch request could still reference the outdated Partition object and therefore miss the topicId change" My understanding is that the log is the source of truth and we will either read from the log if it matches and not read if it doesn't. I see we could get an error erroneously if the partition didn't update in time, but I don't see us being able to read from the log due to a stale partition. Or are you referring to the getPartitionOrException(tp) call picking up a stale partition and both the request and the partition are stale? In this case, we will read from the log, but will identify it with its correct ID. The client will handle based on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org