dajac commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r689770112
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -229,9 +229,16 @@ abstract class AbstractFetcherThread(name: String, } } - protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { + // Visibility for unit tests + protected[server] def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { inLock(partitionMapLock) { - val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty) + // Partitions may have been removed from the fetcher while the thread was waiting for fetch + // response. Filter out removed partitions while holding `partitionMapLock` to ensure that we + // don't update state for any partition that may have already been migrated to another thread. + val filteredEpochEndOffsets = epochEndOffsets.filter { case (tp, _) => + partitionStates.contains(tp) + } Review comment: Did you consider pushing this down into `maybeTruncateToEpochEndOffsets`? This would avoid creating an extra collection here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org