jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r564707693
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -16,28 +16,41 @@ */ package kafka.raft +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.util.NoSuchElementException import java.util.Optional +import java.util.concurrent.ConcurrentSkipListSet -import kafka.log.{AppendOrigin, Log} +import kafka.log.{AppendOrigin, Log, SnapshotGenerated} import kafka.server.{FetchHighWatermark, FetchLogEnd} import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.raft -import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, ReplicatedLog} +import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Isolation, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.FileRawSnapshotReader import org.apache.kafka.snapshot.FileRawSnapshotWriter import org.apache.kafka.snapshot.RawSnapshotReader import org.apache.kafka.snapshot.RawSnapshotWriter +import org.apache.kafka.snapshot.Snapshots import scala.compat.java8.OptionConverters._ -class KafkaMetadataLog( +final class KafkaMetadataLog private ( log: Log, + // This object needs to be thread-safe because the polling thread in the KafkaRaftClient implementation + // and other threads will access this object. This object is used to efficiently notify the polling thread + // when snapshots are created. + snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch], topicPartition: TopicPartition, - maxFetchSizeInBytes: Int = 1024 * 1024 + maxFetchSizeInBytes: Int ) extends ReplicatedLog { + private[this] var oldestSnapshotId = snapshotIds Review comment: Sorry for the delay reply. I missed this comment for some reason. At the moment we need to keep this variable/reference. In this PR, the `oldestSnapshotId` can be anywhere in the `snapshotIds` set. I think that when we implement https://issues.apache.org/jira/browse/KAFKA-12205 can remove this variable. https://issues.apache.org/jira/browse/KAFKA-12205 tracks the work needed to delete any snapshot that is less than the log start offset. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org