jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r564707693



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,28 +16,41 @@
  */
 package kafka.raft
 
+import java.nio.file.Files
 import java.nio.file.NoSuchFileException
+import java.util.NoSuchElementException
 import java.util.Optional
+import java.util.concurrent.ConcurrentSkipListSet
 
-import kafka.log.{AppendOrigin, Log}
+import kafka.log.{AppendOrigin, Log, SnapshotGenerated}
 import kafka.server.{FetchHighWatermark, FetchLogEnd}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.raft
-import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, ReplicatedLog}
+import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  // This object needs to be thread-safe because the polling thread in the 
KafkaRaftClient implementation
+  // and other threads will access this object. This object is used to 
efficiently notify the polling thread
+  // when snapshots are created.
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int = 1024 * 1024
+  maxFetchSizeInBytes: Int
 ) extends ReplicatedLog {
 
+  private[this] var oldestSnapshotId = snapshotIds

Review comment:
       Sorry for the delay reply. I missed this comment for some reason.
   
   At the moment we need to keep this variable/reference. In this PR, the 
`oldestSnapshotId` can be anywhere in the `snapshotIds` set. I think that when 
we implement https://issues.apache.org/jira/browse/KAFKA-12205 can remove this 
variable.
   
   https://issues.apache.org/jira/browse/KAFKA-12205 tracks the work needed to 
delete any snapshot that is less than the log start offset.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to