jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r557563029
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -147,18 +199,106 @@ class KafkaMetadataLog( } override def createSnapshot(snapshotId: raft.OffsetAndEpoch): RawSnapshotWriter = { - FileRawSnapshotWriter.create(log.dir.toPath, snapshotId) + // TODO: Talk to Jason about truncation past the high-watermark since it can lead to truncation past snapshots. + // This can result in the leader having a snapshot that is less that the follower's snapshot. I think that the Raft + // Client checks against this and aborts. If so, then this check and exception is okay. + + // Do let the state machine create snapshots older than the latest snapshot + latestSnapshotId().ifPresent { latest => + if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { + // Since snapshots are less than the high-watermark absolute offset comparison is okay. + throw new IllegalArgumentException( + s"Attemting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" + ) + } + } + + FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this)) } override def readSnapshot(snapshotId: raft.OffsetAndEpoch): Optional[RawSnapshotReader] = { try { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + if (snapshotIds.contains(snapshotId)) { + Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } else { + Optional.empty() + } } catch { - case e: NoSuchFileException => Optional.empty() + case e: NoSuchFileException => + Optional.empty() } } + override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = { + try { + Optional.of(snapshotIds.last) + } catch { + case _: NoSuchElementException => + Optional.empty() + } + } + + override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = { Review comment: Sounds good. In this PR it is not technically the oldest snapshot but it will be when we implement deleting snapshot in https://issues.apache.org/jira/browse/KAFKA-12205 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org