jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r627514050



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -425,16 +438,27 @@ class IncrementalFetchContext(private val time: Time,
         val topicPart = element.getKey
         val respData = element.getValue
         val cachedPart = session.partitionMap.find(new 
CachedPartition(topicPart))
-        val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
-        if (mustRespond) {
+
+        // If we have an situation where there is a valid ID on the partition, 
but it does not match
+        // the ID in topic IDs (likely due to topic deletion and re-creation) 
or there is no valid topic
+        // ID on the broker (topic deleted or broker received a 
metadataResponse without IDs),
+        // remove the cached partition from partitionMap and from the response.

Review comment:
       This is a good point. In general, I think I need to go through the 
session logic for handling different scenarios. (What happens when we have a 
session with different version requests--should we allow that to happen, etc) 
Depending on this, we may want topicId to be a var (to update the ID when we 
change request versions). I'll write up a summary of the logic I'm thinking of 
when I get it worked out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to