gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501944185
########## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ########## @@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition, // completed transactions whose markers are at offsets above the high watermark private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + /** + * Load producer state snapshots by scanning the _logDir. + */ + private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { + val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() + for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) { + tm.put(f.offset, f) + } + tm + } + + /** + * Scans the log directory, gathering all producer state snapshot files. Snapshot files which do not have an offset + * corresponding to one of the provided offsets in segmentBaseOffsets will be removed, except in the case that there + * is a snapshot file at a higher offset than any offset in segmentBaseOffsets. + * + * The goal here is to remove any snapshot files which do not have an associated segment file, but not to remove + */ + private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = { + var latestStraySnapshot: Option[SnapshotFile] = None + val ss = loadSnapshots() + for (snapshot <- ss.values().asScala) { + val key = snapshot.offset + latestStraySnapshot match { + case Some(prev) => + if (!segmentBaseOffsets.contains(key)) { + // this snapshot is now the largest stray snapshot. + prev.deleteIfExists() + ss.remove(prev.offset) + latestStraySnapshot = Some(snapshot) + } + case None => + if (!segmentBaseOffsets.contains(key)) { + latestStraySnapshot = Some(snapshot) Review comment: We perform a check below which may cover this case. After setting the `snapshots` map, we look at the latest snapshot in the map. If the latest snapshot in the map is not equal to the `latestStraySnapshot`, we delete the `latestStraySnapshot`. I think this is a bit confusing though, so it might be better if instead we directly check that the `latestStraySnapshot` is larger than the largest offset in `segmentBaseOffsets`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org