jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r627514050
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time, val topicPart = element.getKey val respData = element.getValue val cachedPart = session.partitionMap.find(new CachedPartition(topicPart)) - val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) - if (mustRespond) { + + // If we have an situation where there is a valid ID on the partition, but it does not match + // the ID in topic IDs (likely due to topic deletion and re-creation) or there is no valid topic + // ID on the broker (topic deleted or broker received a metadataResponse without IDs), + // remove the cached partition from partitionMap and from the response. Review comment: This is a good point. In general, I think I need to go through the session logic for handling different scenarios. (What happens when we have a session with different version requests--should we allow that to happen, etc) Depending on this, we may want topicId to be a var (to update the ID when we change request versions). I'll write up a summary of the logic I'm thinking of when I get it worked out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org